Commit 40733908 authored by zelig's avatar zelig

p2p/protocols, p2p/testing: protocol abstraction and testing

parent 02aeb3d7
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
Package protocols is an extension to p2p. It offers a user friendly simple way to define
devp2p subprotocols by abstracting away code standardly shared by protocols.
* automate assigments of code indexes to messages
* automate RLP decoding/encoding based on reflecting
* provide the forever loop to read incoming messages
* standardise error handling related to communication
* standardised handshake negotiation
* TODO: automatic generation of wire protocol specification for peers
*/
package protocols
import (
"context"
"fmt"
"reflect"
"sync"
"github.com/ethereum/go-ethereum/p2p"
)
// error codes used by this protocol scheme
const (
ErrMsgTooLong = iota
ErrDecode
ErrWrite
ErrInvalidMsgCode
ErrInvalidMsgType
ErrHandshake
ErrNoHandler
ErrHandler
)
// error description strings associated with the codes
var errorToString = map[int]string{
ErrMsgTooLong: "Message too long",
ErrDecode: "Invalid message (RLP error)",
ErrWrite: "Error sending message",
ErrInvalidMsgCode: "Invalid message code",
ErrInvalidMsgType: "Invalid message type",
ErrHandshake: "Handshake error",
ErrNoHandler: "No handler registered error",
ErrHandler: "Message handler error",
}
/*
Error implements the standard go error interface.
Use:
errorf(code, format, params ...interface{})
Prints as:
<description>: <details>
where description is given by code in errorToString
and details is fmt.Sprintf(format, params...)
exported field Code can be checked
*/
type Error struct {
Code int
message string
format string
params []interface{}
}
func (e Error) Error() (message string) {
if len(e.message) == 0 {
name, ok := errorToString[e.Code]
if !ok {
panic("invalid message code")
}
e.message = name
if e.format != "" {
e.message += ": " + fmt.Sprintf(e.format, e.params...)
}
}
return e.message
}
func errorf(code int, format string, params ...interface{}) *Error {
return &Error{
Code: code,
format: format,
params: params,
}
}
// Spec is a protocol specification including its name and version as well as
// the types of messages which are exchanged
type Spec struct {
// Name is the name of the protocol, often a three-letter word
Name string
// Version is the version number of the protocol
Version uint
// MaxMsgSize is the maximum accepted length of the message payload
MaxMsgSize uint32
// Messages is a list of message data types which this protocol uses, with
// each message type being sent with its array index as the code (so
// [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes
// 0, 1 and 2 respectively)
// each message must have a single unique data type
Messages []interface{}
initOnce sync.Once
codes map[reflect.Type]uint64
types map[uint64]reflect.Type
}
func (s *Spec) init() {
s.initOnce.Do(func() {
s.codes = make(map[reflect.Type]uint64, len(s.Messages))
s.types = make(map[uint64]reflect.Type, len(s.Messages))
for i, msg := range s.Messages {
code := uint64(i)
typ := reflect.TypeOf(msg)
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
s.codes[typ] = code
s.types[code] = typ
}
})
}
// Length returns the number of message types in the protocol
func (s *Spec) Length() uint64 {
return uint64(len(s.Messages))
}
// GetCode returns the message code of a type, and boolean second argument is
// false if the message type is not found
func (s *Spec) GetCode(msg interface{}) (uint64, bool) {
s.init()
typ := reflect.TypeOf(msg)
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
code, ok := s.codes[typ]
return code, ok
}
// NewMsg construct a new message type given the code
func (s *Spec) NewMsg(code uint64) (interface{}, bool) {
s.init()
typ, ok := s.types[code]
if !ok {
return nil, false
}
return reflect.New(typ).Interface(), true
}
// Peer represents a remote peer or protocol instance that is running on a peer connection with
// a remote peer
type Peer struct {
*p2p.Peer // the p2p.Peer object representing the remote
rw p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from
spec *Spec
}
// NewPeer constructs a new peer
// this constructor is called by the p2p.Protocol#Run function
// the first two arguments are the arguments passed to p2p.Protocol.Run function
// the third argument is the Spec describing the protocol
func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
return &Peer{
Peer: p,
rw: rw,
spec: spec,
}
}
// Run starts the forever loop that handles incoming messages
// called within the p2p.Protocol#Run function
// the handler argument is a function which is called for each message received
// from the remote peer, a returned error causes the loop to exit
// resulting in disconnection
func (p *Peer) Run(handler func(msg interface{}) error) error {
for {
if err := p.handleIncoming(handler); err != nil {
return err
}
}
}
// Drop disconnects a peer.
// TODO: may need to implement protocol drop only? don't want to kick off the peer
// if they are useful for other protocols
func (p *Peer) Drop(err error) {
p.Disconnect(p2p.DiscSubprotocolError)
}
// Send takes a message, encodes it in RLP, finds the right message code and sends the
// message off to the peer
// this low level call will be wrapped by libraries providing routed or broadcast sends
// but often just used to forward and push messages to directly connected peers
func (p *Peer) Send(msg interface{}) error {
code, found := p.spec.GetCode(msg)
if !found {
return errorf(ErrInvalidMsgType, "%v", code)
}
return p2p.Send(p.rw, code, msg)
}
// handleIncoming(code)
// is called each cycle of the main forever loop that dispatches incoming messages
// if this returns an error the loop returns and the peer is disconnected with the error
// this generic handler
// * checks message size,
// * checks for out-of-range message codes,
// * handles decoding with reflection,
// * call handlers as callbacks
func (p *Peer) handleIncoming(handle func(msg interface{}) error) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
// make sure that the payload has been fully consumed
defer msg.Discard()
if msg.Size > p.spec.MaxMsgSize {
return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
}
val, ok := p.spec.NewMsg(msg.Code)
if !ok {
return errorf(ErrInvalidMsgCode, "%v", msg.Code)
}
if err := msg.Decode(val); err != nil {
return errorf(ErrDecode, "<= %v: %v", msg, err)
}
// call the registered handler callbacks
// a registered callback take the decoded message as argument as an interface
// which the handler is supposed to cast to the appropriate type
// it is entirely safe not to check the cast in the handler since the handler is
// chosen based on the proper type in the first place
if err := handle(val); err != nil {
return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
}
return nil
}
// Handshake negotiates a handshake on the peer connection
// * arguments
// * context
// * the local handshake to be sent to the remote peer
// * funcion to be called on the remote handshake (can be nil)
// * expects a remote handshake back of the same type
// * the dialing peer needs to send the handshake first and then waits for remote
// * the listening peer waits for the remote handshake and then sends it
// returns the remote handshake and an error
func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (rhs interface{}, err error) {
if _, ok := p.spec.GetCode(hs); !ok {
return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs)
}
errc := make(chan error, 2)
handle := func(msg interface{}) error {
rhs = msg
if verify != nil {
return verify(rhs)
}
return nil
}
send := func() { errc <- p.Send(hs) }
receive := func() { errc <- p.handleIncoming(handle) }
go func() {
if p.Inbound() {
receive()
send()
} else {
send()
receive()
}
}()
for i := 0; i < 2; i++ {
select {
case err = <-errc:
case <-ctx.Done():
err = ctx.Err()
}
if err != nil {
return nil, errorf(ErrHandshake, err.Error())
}
}
return rhs, nil
}
This diff is collapsed.
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
)
type TestPeer interface {
ID() discover.NodeID
Drop(error)
}
// TestPeerPool is an example peerPool to demonstrate registration of peer connections
type TestPeerPool struct {
lock sync.Mutex
peers map[discover.NodeID]TestPeer
}
func NewTestPeerPool() *TestPeerPool {
return &TestPeerPool{peers: make(map[discover.NodeID]TestPeer)}
}
func (self *TestPeerPool) Add(p TestPeer) {
self.lock.Lock()
defer self.lock.Unlock()
log.Trace(fmt.Sprintf("pp add peer %v", p.ID()))
self.peers[p.ID()] = p
}
func (self *TestPeerPool) Remove(p TestPeer) {
self.lock.Lock()
defer self.lock.Unlock()
delete(self.peers, p.ID())
}
func (self *TestPeerPool) Has(id discover.NodeID) bool {
self.lock.Lock()
defer self.lock.Unlock()
_, ok := self.peers[id]
return ok
}
func (self *TestPeerPool) Get(id discover.NodeID) TestPeer {
self.lock.Lock()
defer self.lock.Unlock()
return self.peers[id]
}
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
// ProtocolSession is a quasi simulation of a pivot node running
// a service and a number of dummy peers that can send (trigger) or
// receive (expect) messages
type ProtocolSession struct {
Server *p2p.Server
IDs []discover.NodeID
adapter *adapters.SimAdapter
events chan *p2p.PeerEvent
}
// Exchange is the basic units of protocol tests
// the triggers and expects in the arrays are run immediately and asynchronously
// thus one cannot have multiple expects for the SAME peer with DIFFERENT message types
// because it's unpredictable which expect will receive which message
// (with expect #1 and #2, messages might be sent #2 and #1, and both expects will complain about wrong message code)
// an exchange is defined on a session
type Exchange struct {
Label string
Triggers []Trigger
Expects []Expect
}
// Trigger is part of the exchange, incoming message for the pivot node
// sent by a peer
type Trigger struct {
Msg interface{} // type of message to be sent
Code uint64 // code of message is given
Peer discover.NodeID // the peer to send the message to
Timeout time.Duration // timeout duration for the sending
}
// Expect is part of an exchange, outgoing message from the pivot node
// received by a peer
type Expect struct {
Msg interface{} // type of message to expect
Code uint64 // code of message is now given
Peer discover.NodeID // the peer that expects the message
Timeout time.Duration // timeout duration for receiving
}
// Disconnect represents a disconnect event, used and checked by TestDisconnected
type Disconnect struct {
Peer discover.NodeID // discconnected peer
Error error // disconnect reason
}
// trigger sends messages from peers
func (self *ProtocolSession) trigger(trig Trigger) error {
simNode, ok := self.adapter.GetNode(trig.Peer)
if !ok {
return fmt.Errorf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.IDs))
}
mockNode, ok := simNode.Services()[0].(*mockNode)
if !ok {
return fmt.Errorf("trigger: peer %v is not a mock", trig.Peer)
}
errc := make(chan error)
go func() {
errc <- mockNode.Trigger(&trig)
}()
t := trig.Timeout
if t == time.Duration(0) {
t = 1000 * time.Millisecond
}
select {
case err := <-errc:
return err
case <-time.After(t):
return fmt.Errorf("timout expecting %v to send to peer %v", trig.Msg, trig.Peer)
}
}
// expect checks an expectation of a message sent out by the pivot node
func (self *ProtocolSession) expect(exp Expect) error {
if exp.Msg == nil {
return errors.New("no message to expect")
}
simNode, ok := self.adapter.GetNode(exp.Peer)
if !ok {
return fmt.Errorf("trigger: peer %v does not exist (1- %v)", exp.Peer, len(self.IDs))
}
mockNode, ok := simNode.Services()[0].(*mockNode)
if !ok {
return fmt.Errorf("trigger: peer %v is not a mock", exp.Peer)
}
errc := make(chan error)
go func() {
errc <- mockNode.Expect(&exp)
}()
t := exp.Timeout
if t == time.Duration(0) {
t = 2000 * time.Millisecond
}
select {
case err := <-errc:
return err
case <-time.After(t):
return fmt.Errorf("timout expecting %v sent to peer %v", exp.Msg, exp.Peer)
}
}
// TestExchanges tests a series of exchanges against the session
func (self *ProtocolSession) TestExchanges(exchanges ...Exchange) error {
// launch all triggers of this exchanges
for _, e := range exchanges {
errc := make(chan error, len(e.Triggers)+len(e.Expects))
for _, trig := range e.Triggers {
errc <- self.trigger(trig)
}
// each expectation is spawned in separate go-routine
// expectations of an exchange are conjunctive but unordered, i.e.,
// only all of them arriving constitutes a pass
// each expectation is meant to be for a different peer, otherwise they are expected to panic
// testing of an exchange blocks until all expectations are decided
// an expectation is decided if
// expected message arrives OR
// an unexpected message arrives (panic)
// times out on their individual timeout
for _, ex := range e.Expects {
// expect msg spawned to separate go routine
go func(exp Expect) {
errc <- self.expect(exp)
}(ex)
}
// time out globally or finish when all expectations satisfied
timeout := time.After(5 * time.Second)
for i := 0; i < len(e.Triggers)+len(e.Expects); i++ {
select {
case err := <-errc:
if err != nil {
return fmt.Errorf("exchange failed with: %v", err)
}
case <-timeout:
return fmt.Errorf("exchange %v: '%v' timed out", i, e.Label)
}
}
}
return nil
}
// TestDisconnected tests the disconnections given as arguments
// the disconnect structs describe what disconnect error is expected on which peer
func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error {
expects := make(map[discover.NodeID]error)
for _, disconnect := range disconnects {
expects[disconnect.Peer] = disconnect.Error
}
timeout := time.After(time.Second)
for len(expects) > 0 {
select {
case event := <-self.events:
if event.Type != p2p.PeerEventTypeDrop {
continue
}
expectErr, ok := expects[event.Peer]
if !ok {
continue
}
if !(expectErr == nil && event.Error == "" || expectErr != nil && expectErr.Error() == event.Error) {
return fmt.Errorf("unexpected error on peer %v. expected '%v', got '%v'", event.Peer, expectErr, event.Error)
}
delete(expects, event.Peer)
case <-timeout:
return fmt.Errorf("timed out waiting for peers to disconnect")
}
}
return nil
}
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
the p2p/testing package provides a unit test scheme to check simple
protocol message exchanges with one pivot node and a number of dummy peers
The pivot test node runs a node.Service, the dummy peers run a mock node
that can be used to send and receive messages
*/
package testing
import (
"fmt"
"sync"
"testing"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
)
// ProtocolTester is the tester environment used for unit testing protocol
// message exchanges. It uses p2p/simulations framework
type ProtocolTester struct {
*ProtocolSession
network *simulations.Network
}
// NewProtocolTester constructs a new ProtocolTester
// it takes as argument the pivot node id, the number of dummy peers and the
// protocol run function called on a peer connection by the p2p server
func NewProtocolTester(t *testing.T, id discover.NodeID, n int, run func(*p2p.Peer, p2p.MsgReadWriter) error) *ProtocolTester {
services := adapters.Services{
"test": func(ctx *adapters.ServiceContext) (node.Service, error) {
return &testNode{run}, nil
},
"mock": func(ctx *adapters.ServiceContext) (node.Service, error) {
return newMockNode(), nil
},
}
adapter := adapters.NewSimAdapter(services)
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{})
if _, err := net.NewNodeWithConfig(&adapters.NodeConfig{
ID: id,
EnableMsgEvents: true,
Services: []string{"test"},
}); err != nil {
panic(err.Error())
}
if err := net.Start(id); err != nil {
panic(err.Error())
}
node := net.GetNode(id).Node.(*adapters.SimNode)
peers := make([]*adapters.NodeConfig, n)
peerIDs := make([]discover.NodeID, n)
for i := 0; i < n; i++ {
peers[i] = adapters.RandomNodeConfig()
peers[i].Services = []string{"mock"}
peerIDs[i] = peers[i].ID
}
events := make(chan *p2p.PeerEvent, 1000)
node.SubscribeEvents(events)
ps := &ProtocolSession{
Server: node.Server(),
IDs: peerIDs,
adapter: adapter,
events: events,
}
self := &ProtocolTester{
ProtocolSession: ps,
network: net,
}
self.Connect(id, peers...)
return self
}
// Stop stops the p2p server
func (self *ProtocolTester) Stop() error {
self.Server.Stop()
return nil
}
// Connect brings up the remote peer node and connects it using the
// p2p/simulations network connection with the in memory network adapter
func (self *ProtocolTester) Connect(selfID discover.NodeID, peers ...*adapters.NodeConfig) {
for _, peer := range peers {
log.Trace(fmt.Sprintf("start node %v", peer.ID))
if _, err := self.network.NewNodeWithConfig(peer); err != nil {
panic(fmt.Sprintf("error starting peer %v: %v", peer.ID, err))
}
if err := self.network.Start(peer.ID); err != nil {
panic(fmt.Sprintf("error starting peer %v: %v", peer.ID, err))
}
log.Trace(fmt.Sprintf("connect to %v", peer.ID))
if err := self.network.Connect(selfID, peer.ID); err != nil {
panic(fmt.Sprintf("error connecting to peer %v: %v", peer.ID, err))
}
}
}
// testNode wraps a protocol run function and implements the node.Service
// interface
type testNode struct {
run func(*p2p.Peer, p2p.MsgReadWriter) error
}
func (t *testNode) Protocols() []p2p.Protocol {
return []p2p.Protocol{{
Length: 100,
Run: t.run,
}}
}
func (t *testNode) APIs() []rpc.API {
return nil
}
func (t *testNode) Start(server *p2p.Server) error {
return nil
}
func (t *testNode) Stop() error {
return nil
}
// mockNode is a testNode which doesn't actually run a protocol, instead
// exposing channels so that tests can manually trigger and expect certain
// messages
type mockNode struct {
testNode
trigger chan *Trigger
expect chan *Expect
err chan error
stop chan struct{}
stopOnce sync.Once
}
func newMockNode() *mockNode {
mock := &mockNode{
trigger: make(chan *Trigger),
expect: make(chan *Expect),
err: make(chan error),
stop: make(chan struct{}),
}
mock.testNode.run = mock.Run
return mock
}
// Run is a protocol run function which just loops waiting for tests to
// instruct it to either trigger or expect a message from the peer
func (m *mockNode) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
for {
select {
case trig := <-m.trigger:
m.err <- p2p.Send(rw, trig.Code, trig.Msg)
case exp := <-m.expect:
m.err <- p2p.ExpectMsg(rw, exp.Code, exp.Msg)
case <-m.stop:
return nil
}
}
}
func (m *mockNode) Trigger(trig *Trigger) error {
m.trigger <- trig
return <-m.err
}
func (m *mockNode) Expect(exp *Expect) error {
m.expect <- exp
return <-m.err
}
func (m *mockNode) Stop() error {
m.stopOnce.Do(func() { close(m.stop) })
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment