message.go 6.09 KB
Newer Older
zelig's avatar
zelig committed
1 2 3
package p2p

import (
Felix Lange's avatar
Felix Lange committed
4 5
	"bytes"
	"encoding/binary"
6
	"errors"
obscuren's avatar
obscuren committed
7
	"fmt"
Felix Lange's avatar
Felix Lange committed
8 9 10
	"io"
	"io/ioutil"
	"math/big"
11
	"sync/atomic"
Felix Lange's avatar
Felix Lange committed
12

13
	"github.com/ethereum/go-ethereum/ethutil"
Felix Lange's avatar
Felix Lange committed
14
	"github.com/ethereum/go-ethereum/rlp"
zelig's avatar
zelig committed
15 16
)

Felix Lange's avatar
Felix Lange committed
17 18 19 20 21 22 23
// Msg defines the structure of a p2p message.
//
// Note that a Msg can only be sent once since the Payload reader is
// consumed during sending. It is not possible to create a Msg and
// send it any number of times. If you want to reuse an encoded
// structure, encode the payload into a byte array and create a
// separate Msg with a bytes.Reader as Payload for each send.
zelig's avatar
zelig committed
24
type Msg struct {
25
	Code    uint64
Felix Lange's avatar
Felix Lange committed
26 27
	Size    uint32 // size of the paylod
	Payload io.Reader
zelig's avatar
zelig committed
28 29
}

Felix Lange's avatar
Felix Lange committed
30
// NewMsg creates an RLP-encoded message with the given code.
31
func NewMsg(code uint64, params ...interface{}) Msg {
Felix Lange's avatar
Felix Lange committed
32 33 34 35 36
	buf := new(bytes.Buffer)
	for _, p := range params {
		buf.Write(ethutil.Encode(p))
	}
	return Msg{Code: code, Size: uint32(buf.Len()), Payload: buf}
zelig's avatar
zelig committed
37 38
}

Felix Lange's avatar
Felix Lange committed
39 40 41 42 43 44
func encodePayload(params ...interface{}) []byte {
	buf := new(bytes.Buffer)
	for _, p := range params {
		buf.Write(ethutil.Encode(p))
	}
	return buf.Bytes()
zelig's avatar
zelig committed
45 46
}

47 48 49 50 51 52
// Decode parse the RLP content of a message into
// the given value, which must be a pointer.
//
// For the decoding rules, please see package rlp.
func (msg Msg) Decode(val interface{}) error {
	s := rlp.NewListStream(msg.Payload, uint64(msg.Size))
obscuren's avatar
obscuren committed
53 54 55 56 57 58 59 60
	if err := s.Decode(val); err != nil {
		return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err)
	}
	return nil
}

func (msg Msg) String() string {
	return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
61 62
}

Felix Lange's avatar
Felix Lange committed
63 64 65 66 67 68
// Discard reads any remaining payload data into a black hole.
func (msg Msg) Discard() error {
	_, err := io.Copy(ioutil.Discard, msg.Payload)
	return err
}

69 70 71 72 73
type MsgReader interface {
	ReadMsg() (Msg, error)
}

type MsgWriter interface {
74 75 76
	// WriteMsg sends a message. It will block until the message's
	// Payload has been consumed by the other end.
	//
77 78 79 80 81 82 83 84 85 86
	// Note that messages can be sent only once.
	WriteMsg(Msg) error
}

// MsgReadWriter provides reading and writing of encoded messages.
type MsgReadWriter interface {
	MsgReader
	MsgWriter
}

87 88 89 90 91 92
// EncodeMsg writes an RLP-encoded message with the given code and
// data elements.
func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
	return w.WriteMsg(NewMsg(code, data...))
}

Felix Lange's avatar
Felix Lange committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
var magicToken = []byte{34, 64, 8, 145}

func writeMsg(w io.Writer, msg Msg) error {
	// TODO: handle case when Size + len(code) + len(listhdr) overflows uint32
	code := ethutil.Encode(uint32(msg.Code))
	listhdr := makeListHeader(msg.Size + uint32(len(code)))
	payloadLen := uint32(len(listhdr)) + uint32(len(code)) + msg.Size

	start := make([]byte, 8)
	copy(start, magicToken)
	binary.BigEndian.PutUint32(start[4:], payloadLen)

	for _, b := range [][]byte{start, listhdr, code} {
		if _, err := w.Write(b); err != nil {
			return err
		}
	}
	_, err := io.CopyN(w, msg.Payload, int64(msg.Size))
	return err
zelig's avatar
zelig committed
112 113
}

Felix Lange's avatar
Felix Lange committed
114 115 116
func makeListHeader(length uint32) []byte {
	if length < 56 {
		return []byte{byte(length + 0xc0)}
zelig's avatar
zelig committed
117
	}
Felix Lange's avatar
Felix Lange committed
118 119 120
	enc := big.NewInt(int64(length)).Bytes()
	lenb := byte(len(enc)) + 0xf7
	return append([]byte{lenb}, enc...)
zelig's avatar
zelig committed
121 122
}

123
// readMsg reads a message header from r.
Felix Lange's avatar
Felix Lange committed
124 125
// It takes an rlp.ByteReader to ensure that the decoding doesn't buffer.
func readMsg(r rlp.ByteReader) (msg Msg, err error) {
Felix Lange's avatar
Felix Lange committed
126 127 128
	// read magic and payload size
	start := make([]byte, 8)
	if _, err = io.ReadFull(r, start); err != nil {
129
		return msg, newPeerError(errRead, "%v", err)
Felix Lange's avatar
Felix Lange committed
130 131
	}
	if !bytes.HasPrefix(start, magicToken) {
132
		return msg, newPeerError(errMagicTokenMismatch, "got %x, want %x", start[:4], magicToken)
Felix Lange's avatar
Felix Lange committed
133 134 135 136
	}
	size := binary.BigEndian.Uint32(start[4:])

	// decode start of RLP message to get the message code
Felix Lange's avatar
Felix Lange committed
137 138 139
	posr := &postrack{r, 0}
	s := rlp.NewStream(posr)
	if _, err := s.List(); err != nil {
Felix Lange's avatar
Felix Lange committed
140 141
		return msg, err
	}
Felix Lange's avatar
Felix Lange committed
142
	code, err := s.Uint()
Felix Lange's avatar
Felix Lange committed
143 144 145
	if err != nil {
		return msg, err
	}
Felix Lange's avatar
Felix Lange committed
146 147 148
	payloadsize := size - posr.p
	return Msg{code, payloadsize, io.LimitReader(r, int64(payloadsize))}, nil
}
Felix Lange's avatar
Felix Lange committed
149

Felix Lange's avatar
Felix Lange committed
150 151 152 153
// postrack wraps an rlp.ByteReader with a position counter.
type postrack struct {
	r rlp.ByteReader
	p uint32
Felix Lange's avatar
Felix Lange committed
154 155
}

Felix Lange's avatar
Felix Lange committed
156 157 158 159
func (r *postrack) Read(buf []byte) (int, error) {
	n, err := r.r.Read(buf)
	r.p += uint32(n)
	return n, err
Felix Lange's avatar
Felix Lange committed
160 161
}

Felix Lange's avatar
Felix Lange committed
162 163 164 165
func (r *postrack) ReadByte() (byte, error) {
	b, err := r.r.ReadByte()
	if err == nil {
		r.p++
zelig's avatar
zelig committed
166
	}
Felix Lange's avatar
Felix Lange committed
167
	return b, err
zelig's avatar
zelig committed
168
}
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238

// MsgPipe creates a message pipe. Reads on one end are matched
// with writes on the other. The pipe is full-duplex, both ends
// implement MsgReadWriter.
func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
	var (
		c1, c2  = make(chan Msg), make(chan Msg)
		closing = make(chan struct{})
		closed  = new(int32)
		rw1     = &MsgPipeRW{c1, c2, closing, closed}
		rw2     = &MsgPipeRW{c2, c1, closing, closed}
	)
	return rw1, rw2
}

// ErrPipeClosed is returned from pipe operations after the
// pipe has been closed.
var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe")

// MsgPipeRW is an endpoint of a MsgReadWriter pipe.
type MsgPipeRW struct {
	w       chan<- Msg
	r       <-chan Msg
	closing chan struct{}
	closed  *int32
}

// WriteMsg sends a messsage on the pipe.
// It blocks until the receiver has consumed the message payload.
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
	if atomic.LoadInt32(p.closed) == 0 {
		consumed := make(chan struct{}, 1)
		msg.Payload = &eofSignal{msg.Payload, int64(msg.Size), consumed}
		select {
		case p.w <- msg:
			if msg.Size > 0 {
				// wait for payload read or discard
				<-consumed
			}
			return nil
		case <-p.closing:
		}
	}
	return ErrPipeClosed
}

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
	if atomic.LoadInt32(p.closed) == 0 {
		select {
		case msg := <-p.r:
			return msg, nil
		case <-p.closing:
		}
	}
	return Msg{}, ErrPipeClosed
}

// Close unblocks any pending ReadMsg and WriteMsg calls on both ends
// of the pipe. They will return ErrPipeClosed. Note that Close does
// not interrupt any reads from a message payload.
func (p *MsgPipeRW) Close() error {
	if atomic.AddInt32(p.closed, 1) != 1 {
		// someone else is already closing
		atomic.StoreInt32(p.closed, 1) // avoid overflow
		return nil
	}
	close(p.closing)
	return nil
}