message.go 8.92 KB
Newer Older
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16

zelig's avatar
zelig committed
17 18 19
package p2p

import (
Felix Lange's avatar
Felix Lange committed
20
	"bytes"
21
	"errors"
obscuren's avatar
obscuren committed
22
	"fmt"
Felix Lange's avatar
Felix Lange committed
23 24
	"io"
	"io/ioutil"
25
	"sync/atomic"
26
	"time"
Felix Lange's avatar
Felix Lange committed
27

28
	"github.com/ethereum/go-ethereum/event"
29
	"github.com/ethereum/go-ethereum/p2p/enode"
Felix Lange's avatar
Felix Lange committed
30
	"github.com/ethereum/go-ethereum/rlp"
zelig's avatar
zelig committed
31 32
)

Felix Lange's avatar
Felix Lange committed
33 34 35 36 37 38 39
// Msg defines the structure of a p2p message.
//
// Note that a Msg can only be sent once since the Payload reader is
// consumed during sending. It is not possible to create a Msg and
// send it any number of times. If you want to reuse an encoded
// structure, encode the payload into a byte array and create a
// separate Msg with a bytes.Reader as Payload for each send.
zelig's avatar
zelig committed
40
type Msg struct {
41
	Code       uint64
42
	Size       uint32 // Size of the raw payload
43 44
	Payload    io.Reader
	ReceivedAt time.Time
45 46 47 48

	meterCap  Cap    // Protocol name and version for egress metering
	meterCode uint64 // Message within protocol for egress metering
	meterSize uint32 // Compressed message size for ingress metering
zelig's avatar
zelig committed
49 50
}

51
// Decode parses the RLP content of a message into
52 53 54 55
// the given value, which must be a pointer.
//
// For the decoding rules, please see package rlp.
func (msg Msg) Decode(val interface{}) error {
56 57
	s := rlp.NewStream(msg.Payload, uint64(msg.Size))
	if err := s.Decode(val); err != nil {
obscuren's avatar
obscuren committed
58
		return newPeerError(errInvalidMsg, "(code %x) (size %d) %v", msg.Code, msg.Size, err)
obscuren's avatar
obscuren committed
59 60 61 62 63 64
	}
	return nil
}

func (msg Msg) String() string {
	return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
65 66
}

Felix Lange's avatar
Felix Lange committed
67 68 69 70 71 72
// Discard reads any remaining payload data into a black hole.
func (msg Msg) Discard() error {
	_, err := io.Copy(ioutil.Discard, msg.Payload)
	return err
}

73 74 75 76
func (msg Msg) Time() time.Time {
	return msg.ReceivedAt
}

77 78 79 80 81
type MsgReader interface {
	ReadMsg() (Msg, error)
}

type MsgWriter interface {
82 83 84
	// WriteMsg sends a message. It will block until the message's
	// Payload has been consumed by the other end.
	//
85 86
	// Note that messages can be sent only once because their
	// payload reader is drained.
87 88 89 90
	WriteMsg(Msg) error
}

// MsgReadWriter provides reading and writing of encoded messages.
91 92
// Implementations should ensure that ReadMsg and WriteMsg can be
// called simultaneously from multiple goroutines.
93 94 95 96 97
type MsgReadWriter interface {
	MsgReader
	MsgWriter
}

98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
// Send writes an RLP-encoded message with the given code.
// data should encode as an RLP list.
func Send(w MsgWriter, msgcode uint64, data interface{}) error {
	size, r, err := rlp.EncodeToReader(data)
	if err != nil {
		return err
	}
	return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
}

// SendItems writes an RLP with the given code and data elements.
// For a call such as:
//
//    SendItems(w, code, e1, e2, e3)
//
// the message payload will be an RLP list containing the items:
//
//    [e1, e2, e3]
//
func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error {
	return Send(w, msgcode, elems)
119 120
}

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
// eofSignal wraps a reader with eof signaling. the eof channel is
// closed when the wrapped reader returns an error or when count bytes
// have been read.
type eofSignal struct {
	wrapped io.Reader
	count   uint32 // number of bytes left
	eof     chan<- struct{}
}

// note: when using eofSignal to detect whether a message payload
// has been read, Read might not be called for zero sized messages.
func (r *eofSignal) Read(buf []byte) (int, error) {
	if r.count == 0 {
		if r.eof != nil {
			r.eof <- struct{}{}
			r.eof = nil
		}
		return 0, io.EOF
	}

	max := len(buf)
	if int(r.count) < len(buf) {
		max = int(r.count)
	}
	n, err := r.wrapped.Read(buf[:max])
	r.count -= uint32(n)
	if (err != nil || r.count == 0) && r.eof != nil {
		r.eof <- struct{}{} // tell Peer that msg has been consumed
		r.eof = nil
	}
	return n, err
}

154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
// MsgPipe creates a message pipe. Reads on one end are matched
// with writes on the other. The pipe is full-duplex, both ends
// implement MsgReadWriter.
func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
	var (
		c1, c2  = make(chan Msg), make(chan Msg)
		closing = make(chan struct{})
		closed  = new(int32)
		rw1     = &MsgPipeRW{c1, c2, closing, closed}
		rw2     = &MsgPipeRW{c2, c1, closing, closed}
	)
	return rw1, rw2
}

// ErrPipeClosed is returned from pipe operations after the
// pipe has been closed.
var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe")

// MsgPipeRW is an endpoint of a MsgReadWriter pipe.
type MsgPipeRW struct {
	w       chan<- Msg
	r       <-chan Msg
	closing chan struct{}
	closed  *int32
}

180
// WriteMsg sends a message on the pipe.
181 182 183 184
// It blocks until the receiver has consumed the message payload.
func (p *MsgPipeRW) WriteMsg(msg Msg) error {
	if atomic.LoadInt32(p.closed) == 0 {
		consumed := make(chan struct{}, 1)
185
		msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
186 187 188 189
		select {
		case p.w <- msg:
			if msg.Size > 0 {
				// wait for payload read or discard
190 191 192 193
				select {
				case <-consumed:
				case <-p.closing:
				}
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
			}
			return nil
		case <-p.closing:
		}
	}
	return ErrPipeClosed
}

// ReadMsg returns a message sent on the other end of the pipe.
func (p *MsgPipeRW) ReadMsg() (Msg, error) {
	if atomic.LoadInt32(p.closed) == 0 {
		select {
		case msg := <-p.r:
			return msg, nil
		case <-p.closing:
		}
	}
	return Msg{}, ErrPipeClosed
}

// Close unblocks any pending ReadMsg and WriteMsg calls on both ends
215 216
// of the pipe. They will return ErrPipeClosed. Close also
// interrupts any reads from a message payload.
217 218 219 220 221 222 223 224 225
func (p *MsgPipeRW) Close() error {
	if atomic.AddInt32(p.closed, 1) != 1 {
		// someone else is already closing
		atomic.StoreInt32(p.closed, 1) // avoid overflow
		return nil
	}
	close(p.closing)
	return nil
}
226 227 228 229 230 231 232 233 234 235 236 237 238 239

// ExpectMsg reads a message from r and verifies that its
// code and encoded RLP content match the provided values.
// If content is nil, the payload is discarded and not verified.
func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
	msg, err := r.ReadMsg()
	if err != nil {
		return err
	}
	if msg.Code != code {
		return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, code)
	}
	if content == nil {
		return msg.Discard()
240 241 242 243 244 245 246 247 248 249 250 251 252 253
	}
	contentEnc, err := rlp.EncodeToBytes(content)
	if err != nil {
		panic("content encode error: " + err.Error())
	}
	if int(msg.Size) != len(contentEnc) {
		return fmt.Errorf("message size mismatch: got %d, want %d", msg.Size, len(contentEnc))
	}
	actualContent, err := ioutil.ReadAll(msg.Payload)
	if err != nil {
		return err
	}
	if !bytes.Equal(actualContent, contentEnc) {
		return fmt.Errorf("message payload mismatch:\ngot:  %x\nwant: %x", actualContent, contentEnc)
254 255 256
	}
	return nil
}
257 258 259 260 261 262

// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent
// or received
type msgEventer struct {
	MsgReadWriter

263 264 265 266 267
	feed          *event.Feed
	peerID        enode.ID
	Protocol      string
	localAddress  string
	remoteAddress string
268 269 270 271
}

// newMsgEventer returns a msgEventer which sends message events to the given
// feed
272
func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID enode.ID, proto, remote, local string) *msgEventer {
273 274 275 276 277
	return &msgEventer{
		MsgReadWriter: rw,
		feed:          feed,
		peerID:        peerID,
		Protocol:      proto,
278 279
		remoteAddress: remote,
		localAddress:  local,
280 281 282 283 284
	}
}

// ReadMsg reads a message from the underlying MsgReadWriter and emits a
// "message received" event
285 286
func (ev *msgEventer) ReadMsg() (Msg, error) {
	msg, err := ev.MsgReadWriter.ReadMsg()
287 288 289
	if err != nil {
		return msg, err
	}
290
	ev.feed.Send(&PeerEvent{
291 292 293 294 295 296 297
		Type:          PeerEventTypeMsgRecv,
		Peer:          ev.peerID,
		Protocol:      ev.Protocol,
		MsgCode:       &msg.Code,
		MsgSize:       &msg.Size,
		LocalAddress:  ev.localAddress,
		RemoteAddress: ev.remoteAddress,
298 299 300 301 302 303
	})
	return msg, nil
}

// WriteMsg writes a message to the underlying MsgReadWriter and emits a
// "message sent" event
304 305
func (ev *msgEventer) WriteMsg(msg Msg) error {
	err := ev.MsgReadWriter.WriteMsg(msg)
306 307 308
	if err != nil {
		return err
	}
309
	ev.feed.Send(&PeerEvent{
310 311 312 313 314 315 316
		Type:          PeerEventTypeMsgSend,
		Peer:          ev.peerID,
		Protocol:      ev.Protocol,
		MsgCode:       &msg.Code,
		MsgSize:       &msg.Size,
		LocalAddress:  ev.localAddress,
		RemoteAddress: ev.remoteAddress,
317 318 319 320 321 322
	})
	return nil
}

// Close closes the underlying MsgReadWriter if it implements the io.Closer
// interface
323 324
func (ev *msgEventer) Close() error {
	if v, ok := ev.MsgReadWriter.(io.Closer); ok {
325 326 327 328
		return v.Close()
	}
	return nil
}