message.go 5.43 KB
Newer Older
zelig's avatar
zelig committed
1 2 3
package p2p

import (
Felix Lange's avatar
Felix Lange committed
4
	"bytes"
5
	"errors"
obscuren's avatar
obscuren committed
6
	"fmt"
Felix Lange's avatar
Felix Lange committed
7 8
	"io"
	"io/ioutil"
9
	"net"
10
	"sync"
11
	"sync/atomic"
12
	"time"
Felix Lange's avatar
Felix Lange committed
13

14
	"github.com/ethereum/go-ethereum/ethutil"
Felix Lange's avatar
Felix Lange committed
15
	"github.com/ethereum/go-ethereum/rlp"
zelig's avatar
zelig committed
16 17
)

Felix Lange's avatar
Felix Lange committed
18 19 20 21 22 23 24
// Msg defines the structure of a p2p message.
//
// Note that a Msg can only be sent once since the Payload reader is
// consumed during sending. It is not possible to create a Msg and
// send it any number of times. If you want to reuse an encoded
// structure, encode the payload into a byte array and create a
// separate Msg with a bytes.Reader as Payload for each send.
zelig's avatar
zelig committed
25
type Msg struct {
26
	Code    uint64
Felix Lange's avatar
Felix Lange committed
27 28
	Size    uint32 // size of the paylod
	Payload io.Reader
zelig's avatar
zelig committed
29 30
}

Felix Lange's avatar
Felix Lange committed
31
// NewMsg creates an RLP-encoded message with the given code.
32
func NewMsg(code uint64, params ...interface{}) Msg {
33 34
	p := bytes.NewReader(ethutil.Encode(params))
	return Msg{Code: code, Size: uint32(p.Len()), Payload: p}
zelig's avatar
zelig committed
35 36
}

37 38 39 40 41
// Decode parse the RLP content of a message into
// the given value, which must be a pointer.
//
// For the decoding rules, please see package rlp.
func (msg Msg) Decode(val interface{}) error {
42
	if err := rlp.Decode(msg.Payload, val); err != nil {
obscuren's avatar
obscuren committed
43 44 45 46 47 48 49
		return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err)
	}
	return nil
}

func (msg Msg) String() string {
	return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
50 51
}

Felix Lange's avatar
Felix Lange committed
52 53 54 55 56 57
// Discard reads any remaining payload data into a black hole.
func (msg Msg) Discard() error {
	_, err := io.Copy(ioutil.Discard, msg.Payload)
	return err
}

58 59 60 61 62
type MsgReader interface {
	ReadMsg() (Msg, error)
}

type MsgWriter interface {
63 64 65
	// WriteMsg sends a message. It will block until the message's
	// Payload has been consumed by the other end.
	//
66 67
	// Note that messages can be sent only once because their
	// payload reader is drained.
68 69 70 71
	WriteMsg(Msg) error
}

// MsgReadWriter provides reading and writing of encoded messages.
72 73
// Implementations should ensure that ReadMsg and WriteMsg can be
// called simultaneously from multiple goroutines.
74 75 76 77 78
type MsgReadWriter interface {
	MsgReader
	MsgWriter
}

79 80 81 82 83 84
// EncodeMsg writes an RLP-encoded message with the given code and
// data elements.
func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
	return w.WriteMsg(NewMsg(code, data...))
}

85 86 87
// netWrapper wrapsa MsgReadWriter with locks around
// ReadMsg/WriteMsg and applies read/write deadlines.
type netWrapper struct {
88
	rmu, wmu sync.Mutex
89 90 91 92

	rtimeout, wtimeout time.Duration
	conn               net.Conn
	wrapped            MsgReadWriter
93 94
}

95
func (rw *netWrapper) ReadMsg() (Msg, error) {
96 97
	rw.rmu.Lock()
	defer rw.rmu.Unlock()
98
	rw.conn.SetReadDeadline(time.Now().Add(rw.rtimeout))
99 100 101
	return rw.wrapped.ReadMsg()
}

102
func (rw *netWrapper) WriteMsg(msg Msg) error {
103 104
	rw.wmu.Lock()
	defer rw.wmu.Unlock()
105
	rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout))
106 107 108
	return rw.wrapped.WriteMsg(msg)
}

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
// eofSignal wraps a reader with eof signaling. the eof channel is
// closed when the wrapped reader returns an error or when count bytes
// have been read.
type eofSignal struct {
	wrapped io.Reader
	count   uint32 // number of bytes left
	eof     chan<- struct{}
}

// note: when using eofSignal to detect whether a message payload
// has been read, Read might not be called for zero sized messages.
func (r *eofSignal) Read(buf []byte) (int, error) {
	if r.count == 0 {
		if r.eof != nil {
			r.eof <- struct{}{}
			r.eof = nil
		}
		return 0, io.EOF
	}

	max := len(buf)
	if int(r.count) < len(buf) {
		max = int(r.count)
	}
	n, err := r.wrapped.Read(buf[:max])
	r.count -= uint32(n)
	if (err != nil || r.count == 0) && r.eof != nil {
		r.eof <- struct{}{} // tell Peer that msg has been consumed
		r.eof = nil
	}
	return n, err
}

142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
// MsgPipe creates a message pipe. Reads on one end are matched
// with writes on the other. The pipe is full-duplex, both ends
// implement MsgReadWriter.
func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
	var (
		c1, c2  = make(chan Msg), make(chan Msg)
		closing = make(chan struct{})
		closed  = new(int32)
		rw1     = &MsgPipeRW{c1, c2, closing, closed}
		rw2     = &MsgPipeRW{c2, c1, closing, closed}
	)
	return rw1, rw2
}

// ErrPipeClosed is returned from pipe operations after the
// pipe has been closed.
var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe")

// MsgPipeRW is an endpoint of a MsgReadWriter pipe.
type MsgPipeRW struct {
	w       chan<- Msg
	r       <-chan Msg
	closing chan struct{}
	closed  *int32
}

// WriteMsg sends a messsage on the pipe.
// It blocks until the receiver has consumed the message payload.
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
	if atomic.LoadInt32(p.closed) == 0 {
		consumed := make(chan struct{}, 1)
173
		msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
		select {
		case p.w <- msg:
			if msg.Size > 0 {
				// wait for payload read or discard
				<-consumed
			}
			return nil
		case <-p.closing:
		}
	}
	return ErrPipeClosed
}

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
	if atomic.LoadInt32(p.closed) == 0 {
		select {
		case msg := <-p.r:
			return msg, nil
		case <-p.closing:
		}
	}
	return Msg{}, ErrPipeClosed
}

// Close unblocks any pending ReadMsg and WriteMsg calls on both ends
// of the pipe. They will return ErrPipeClosed. Note that Close does
// not interrupt any reads from a message payload.
func (p *MsgPipeRW) Close() error {
	if atomic.AddInt32(p.closed, 1) != 1 {
		// someone else is already closing
		atomic.StoreInt32(p.closed, 1) // avoid overflow
		return nil
	}
	close(p.closing)
	return nil
}