Commit 96ae35e2 authored by Felix Lange's avatar Felix Lange

p2p, p2p/discover, p2p/nat: rework logging using context keys

parent 35e8308b
...@@ -133,7 +133,7 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now ...@@ -133,7 +133,7 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now
var newtasks []task var newtasks []task
addDial := func(flag connFlag, n *discover.Node) bool { addDial := func(flag connFlag, n *discover.Node) bool {
if err := s.checkDial(n, peers); err != nil { if err := s.checkDial(n, peers); err != nil {
log.Debug(fmt.Sprintf("skipping dial candidate %x@%v:%d: %v", n.ID[:8], n.IP, n.TCP, err)) log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
return false return false
} }
s.dialing[n.ID] = flag s.dialing[n.ID] = flag
...@@ -162,7 +162,7 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now ...@@ -162,7 +162,7 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now
err := s.checkDial(t.dest, peers) err := s.checkDial(t.dest, peers)
switch err { switch err {
case errNotWhitelisted, errSelf: case errNotWhitelisted, errSelf:
log.Debug(fmt.Sprintf("removing static dial candidate %x@%v:%d: %v", t.dest.ID[:8], t.dest.IP, t.dest.TCP, err)) log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
delete(s.static, t.dest.ID) delete(s.static, t.dest.ID)
case nil: case nil:
s.dialing[id] = t.flags s.dialing[id] = t.flags
...@@ -266,7 +266,7 @@ func (t *dialTask) Do(srv *Server) { ...@@ -266,7 +266,7 @@ func (t *dialTask) Do(srv *Server) {
// The backoff delay resets when the node is found. // The backoff delay resets when the node is found.
func (t *dialTask) resolve(srv *Server) bool { func (t *dialTask) resolve(srv *Server) bool {
if srv.ntab == nil { if srv.ntab == nil {
log.Debug(fmt.Sprintf("can't resolve node %x: discovery is disabled", t.dest.ID[:6])) log.Debug("Can't resolve node", "id", t.dest.ID, "err", "discovery is disabled")
return false return false
} }
if t.resolveDelay == 0 { if t.resolveDelay == 0 {
...@@ -282,23 +282,22 @@ func (t *dialTask) resolve(srv *Server) bool { ...@@ -282,23 +282,22 @@ func (t *dialTask) resolve(srv *Server) bool {
if t.resolveDelay > maxResolveDelay { if t.resolveDelay > maxResolveDelay {
t.resolveDelay = maxResolveDelay t.resolveDelay = maxResolveDelay
} }
log.Debug(fmt.Sprintf("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay)) log.Debug("Resolving node failed", "id", t.dest.ID, "newdelay", t.resolveDelay)
return false return false
} }
// The node was found. // The node was found.
t.resolveDelay = initialResolveDelay t.resolveDelay = initialResolveDelay
t.dest = resolved t.dest = resolved
log.Debug(fmt.Sprintf("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP)) log.Debug("Resolved node", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)})
return true return true
} }
// dial performs the actual connection attempt. // dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
log.Debug(fmt.Sprintf("dial tcp %v (%x)", addr, dest.ID[:6]))
fd, err := srv.Dialer.Dial("tcp", addr.String()) fd, err := srv.Dialer.Dial("tcp", addr.String())
if err != nil { if err != nil {
log.Trace(fmt.Sprintf("%v", err)) log.Trace("Dial error", "task", t, "err", err)
return false return false
} }
mfd := newMeteredConn(fd, false) mfd := newMeteredConn(fd, false)
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"fmt"
"os" "os"
"sync" "sync"
"time" "time"
...@@ -180,12 +179,11 @@ func (db *nodeDB) storeInt64(key []byte, n int64) error { ...@@ -180,12 +179,11 @@ func (db *nodeDB) storeInt64(key []byte, n int64) error {
func (db *nodeDB) node(id NodeID) *Node { func (db *nodeDB) node(id NodeID) *Node {
blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil) blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil)
if err != nil { if err != nil {
log.Trace(fmt.Sprintf("failed to retrieve node %v: %v", id, err))
return nil return nil
} }
node := new(Node) node := new(Node)
if err := rlp.DecodeBytes(blob, node); err != nil { if err := rlp.DecodeBytes(blob, node); err != nil {
log.Warn(fmt.Sprintf("failed to decode node RLP: %v", err)) log.Error("Failed to decode node RLP", "err", err)
return nil return nil
} }
node.sha = crypto.Keccak256Hash(node.ID[:]) node.sha = crypto.Keccak256Hash(node.ID[:])
...@@ -233,7 +231,7 @@ func (db *nodeDB) expirer() { ...@@ -233,7 +231,7 @@ func (db *nodeDB) expirer() {
select { select {
case <-tick: case <-tick:
if err := db.expireNodes(); err != nil { if err := db.expireNodes(); err != nil {
log.Error(fmt.Sprintf("Failed to expire nodedb items: %v", err)) log.Error("Failed to expire nodedb items", "err", err)
} }
case <-db.quit: case <-db.quit:
...@@ -352,7 +350,7 @@ func nextNode(it iterator.Iterator) *Node { ...@@ -352,7 +350,7 @@ func nextNode(it iterator.Iterator) *Node {
} }
var n Node var n Node
if err := rlp.DecodeBytes(it.Value(), &n); err != nil { if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
log.Warn(fmt.Sprintf("invalid node %x: %v", id, err)) log.Warn("Failed to decode node RLP", "id", id, "err", err)
continue continue
} }
return &n return &n
......
...@@ -221,6 +221,11 @@ func (n NodeID) GoString() string { ...@@ -221,6 +221,11 @@ func (n NodeID) GoString() string {
return fmt.Sprintf("discover.HexID(\"%x\")", n[:]) return fmt.Sprintf("discover.HexID(\"%x\")", n[:])
} }
// TerminalString returns a shortened hex string for terminal logging.
func (n NodeID) TerminalString() string {
return hex.EncodeToString(n[:8])
}
// HexID converts a hex string to a NodeID. // HexID converts a hex string to a NodeID.
// The string may be prefixed with 0x. // The string may be prefixed with 0x.
func HexID(in string) (NodeID, error) { func HexID(in string) (NodeID, error) {
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"fmt" "fmt"
"net" "net"
"sort" "sort"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -50,16 +49,10 @@ func checkClockDrift() { ...@@ -50,16 +49,10 @@ func checkClockDrift() {
return return
} }
if drift < -driftThreshold || drift > driftThreshold { if drift < -driftThreshold || drift > driftThreshold {
warning := fmt.Sprintf("System clock seems off by %v, which can prevent network connectivity", drift) log.Warn(fmt.Sprintf("System clock seems off by %v, which can prevent network connectivity", drift))
howtofix := fmt.Sprintf("Please enable network time synchronisation in system settings") log.Warn("Please enable network time synchronisation in system settings.")
separator := strings.Repeat("-", len(warning))
log.Warn(fmt.Sprint(separator))
log.Warn(fmt.Sprint(warning))
log.Warn(fmt.Sprint(howtofix))
log.Warn(fmt.Sprint(separator))
} else { } else {
log.Debug(fmt.Sprintf("Sanity NTP check reported %v drift, all ok", drift)) log.Debug("NTP sanity check done", "drift", drift)
} }
} }
......
...@@ -277,10 +277,10 @@ func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { ...@@ -277,10 +277,10 @@ func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
// Bump the failure counter to detect and evacuate non-bonded entries // Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1 fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails) tab.db.updateFindFails(n.ID, fails)
log.Trace(fmt.Sprintf("Bumping failures for %x: %d", n.ID[:8], fails)) log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
if fails >= maxFindnodeFailures { if fails >= maxFindnodeFailures {
log.Trace(fmt.Sprintf("Evacuating node %x: %d findnode failures", n.ID[:8], fails)) log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n) tab.delete(n)
} }
} }
...@@ -385,13 +385,11 @@ func (tab *Table) doRefresh(done chan struct{}) { ...@@ -385,13 +385,11 @@ func (tab *Table) doRefresh(done chan struct{}) {
seeds = tab.bondall(append(seeds, tab.nursery...)) seeds = tab.bondall(append(seeds, tab.nursery...))
if len(seeds) == 0 { if len(seeds) == 0 {
log.Debug(fmt.Sprintf("no seed nodes found")) log.Debug("No discv4 seed nodes found")
} }
for _, n := range seeds { for _, n := range seeds {
log.Debug("", "msg", log.Lazy{Fn: func() string { age := log.Lazy{Fn: func() time.Duration { return time.Since(tab.db.lastPong(n.ID)) }}
age := time.Since(tab.db.lastPong(n.ID)) log.Trace("Found seed node in database", "id", n.ID, "addr", n.addr(), "age", age)
return fmt.Sprintf("seed node (age %v): %v", age, n)
}})
} }
tab.mutex.Lock() tab.mutex.Lock()
tab.stuff(seeds) tab.stuff(seeds)
...@@ -470,7 +468,7 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 ...@@ -470,7 +468,7 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
var result error var result error
age := time.Since(tab.db.lastPong(id)) age := time.Since(tab.db.lastPong(id))
if node == nil || fails > 0 || age > nodeDBNodeExpiration { if node == nil || fails > 0 || age > nodeDBNodeExpiration {
log.Trace(fmt.Sprintf("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age)) log.Trace("Starting bonding ping/pong", "id", id, "known", node != nil, "failcount", fails, "age", age)
tab.bondmu.Lock() tab.bondmu.Lock()
w := tab.bonding[id] w := tab.bonding[id]
......
...@@ -147,6 +147,7 @@ func nodeToRPC(n *Node) rpcNode { ...@@ -147,6 +147,7 @@ func nodeToRPC(n *Node) rpcNode {
type packet interface { type packet interface {
handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
name() string
} }
type conn interface { type conn interface {
...@@ -223,7 +224,7 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP ...@@ -223,7 +224,7 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Info(fmt.Sprint("Listening,", tab.self)) log.Debug("UDP listener up", "self", tab.self)
return tab, nil return tab, nil
} }
...@@ -269,7 +270,7 @@ func (t *udp) close() { ...@@ -269,7 +270,7 @@ func (t *udp) close() {
func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error { func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
// TODO: maybe check for ReplyTo field in callback to measure RTT // TODO: maybe check for ReplyTo field in callback to measure RTT
errc := t.pending(toid, pongPacket, func(interface{}) bool { return true }) errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
t.send(toaddr, pingPacket, ping{ t.send(toaddr, pingPacket, &ping{
Version: Version, Version: Version,
From: t.ourEndpoint, From: t.ourEndpoint,
To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
...@@ -293,14 +294,14 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node ...@@ -293,14 +294,14 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node
nreceived++ nreceived++
n, err := t.nodeFromRPC(toaddr, rn) n, err := t.nodeFromRPC(toaddr, rn)
if err != nil { if err != nil {
log.Trace(fmt.Sprintf("invalid neighbor node (%v) from %v: %v", rn.IP, toaddr, err)) log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
continue continue
} }
nodes = append(nodes, n) nodes = append(nodes, n)
} }
return nreceived >= bucketSize return nreceived >= bucketSize
}) })
t.send(toaddr, findnodePacket, findnode{ t.send(toaddr, findnodePacket, &findnode{
Target: target, Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()), Expiration: uint64(time.Now().Add(expiration).Unix()),
}) })
...@@ -458,15 +459,13 @@ func init() { ...@@ -458,15 +459,13 @@ func init() {
} }
} }
func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error { func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) error {
packet, err := encodePacket(t.priv, ptype, req) packet, err := encodePacket(t.priv, ptype, req)
if err != nil { if err != nil {
return err return err
} }
log.Trace(fmt.Sprintf(">>> %v %T", toaddr, req)) _, err = t.conn.WriteToUDP(packet, toaddr)
if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil { log.Trace(">> "+req.name(), "addr", toaddr, "err", err)
log.Trace(fmt.Sprint("UDP send failed:", err))
}
return err return err
} }
...@@ -475,13 +474,13 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, ...@@ -475,13 +474,13 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte,
b.Write(headSpace) b.Write(headSpace)
b.WriteByte(ptype) b.WriteByte(ptype)
if err := rlp.Encode(b, req); err != nil { if err := rlp.Encode(b, req); err != nil {
log.Error(fmt.Sprint("error encoding packet:", err)) log.Error("Can't encode discv4 packet", "err", err)
return nil, err return nil, err
} }
packet := b.Bytes() packet := b.Bytes()
sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv) sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv)
if err != nil { if err != nil {
log.Error(fmt.Sprint("could not sign packet:", err)) log.Error("Can't sign discv4 packet", "err", err)
return nil, err return nil, err
} }
copy(packet[macSize:], sig) copy(packet[macSize:], sig)
...@@ -503,11 +502,11 @@ func (t *udp) readLoop() { ...@@ -503,11 +502,11 @@ func (t *udp) readLoop() {
nbytes, from, err := t.conn.ReadFromUDP(buf) nbytes, from, err := t.conn.ReadFromUDP(buf)
if netutil.IsTemporaryError(err) { if netutil.IsTemporaryError(err) {
// Ignore temporary read errors. // Ignore temporary read errors.
log.Debug(fmt.Sprintf("Temporary read error: %v", err)) log.Debug("Temporary UDP read error", "err", err)
continue continue
} else if err != nil { } else if err != nil {
// Shut down the loop for permament errors. // Shut down the loop for permament errors.
log.Debug(fmt.Sprintf("Read error: %v", err)) log.Debug("UDP read error", "err", err)
return return
} }
t.handlePacket(from, buf[:nbytes]) t.handlePacket(from, buf[:nbytes])
...@@ -517,14 +516,11 @@ func (t *udp) readLoop() { ...@@ -517,14 +516,11 @@ func (t *udp) readLoop() {
func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error { func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
packet, fromID, hash, err := decodePacket(buf) packet, fromID, hash, err := decodePacket(buf)
if err != nil { if err != nil {
log.Debug(fmt.Sprintf("Bad packet from %v: %v", from, err)) log.Debug("Bad discv4 packet", "addr", from, "err", err)
return err return err
} }
status := "ok" err = packet.handle(t, from, fromID, hash)
if err = packet.handle(t, from, fromID, hash); err != nil { log.Trace("<< "+packet.name(), "addr", from, "err", err)
status = err.Error()
}
log.Trace(fmt.Sprintf("<<< %v %T: %s", from, packet, status))
return err return err
} }
...@@ -563,7 +559,7 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er ...@@ -563,7 +559,7 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
if expired(req.Expiration) { if expired(req.Expiration) {
return errExpired return errExpired
} }
t.send(from, pongPacket, pong{ t.send(from, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP), To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac, ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()), Expiration: uint64(time.Now().Add(expiration).Unix()),
...@@ -575,6 +571,8 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er ...@@ -575,6 +571,8 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
return nil return nil
} }
func (req *ping) name() string { return "PING/v4" }
func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) { if expired(req.Expiration) {
return errExpired return errExpired
...@@ -585,6 +583,8 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er ...@@ -585,6 +583,8 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
return nil return nil
} }
func (req *pong) name() string { return "PONG/v4" }
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) { if expired(req.Expiration) {
return errExpired return errExpired
...@@ -613,13 +613,15 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte ...@@ -613,13 +613,15 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
} }
p.Nodes = append(p.Nodes, nodeToRPC(n)) p.Nodes = append(p.Nodes, nodeToRPC(n))
if len(p.Nodes) == maxNeighbors || i == len(closest)-1 { if len(p.Nodes) == maxNeighbors || i == len(closest)-1 {
t.send(from, neighborsPacket, p) t.send(from, neighborsPacket, &p)
p.Nodes = p.Nodes[:0] p.Nodes = p.Nodes[:0]
} }
} }
return nil return nil
} }
func (req *findnode) name() string { return "FINDNODE/v4" }
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) { if expired(req.Expiration) {
return errExpired return errExpired
...@@ -630,6 +632,8 @@ func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byt ...@@ -630,6 +632,8 @@ func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byt
return nil return nil
} }
func (req *neighbors) name() string { return "NEIGHBORS/v4" }
func expired(ts uint64) bool { func expired(ts uint64) bool {
return time.Unix(int64(ts), 0).Before(time.Now()) return time.Unix(int64(ts), 0).Before(time.Now())
} }
...@@ -98,16 +98,17 @@ const ( ...@@ -98,16 +98,17 @@ const (
// Map adds a port mapping on m and keeps it alive until c is closed. // Map adds a port mapping on m and keeps it alive until c is closed.
// This function is typically invoked in its own goroutine. // This function is typically invoked in its own goroutine.
func Map(m Interface, c chan struct{}, protocol string, extport, intport int, name string) { func Map(m Interface, c chan struct{}, protocol string, extport, intport int, name string) {
log := log.New("proto", protocol, "extport", extport, "intport", intport, "interface", m)
refresh := time.NewTimer(mapUpdateInterval) refresh := time.NewTimer(mapUpdateInterval)
defer func() { defer func() {
refresh.Stop() refresh.Stop()
log.Debug(fmt.Sprintf("deleting port mapping: %s %d -> %d (%s) using %s", protocol, extport, intport, name, m)) log.Debug("Deleting port mapping")
m.DeleteMapping(protocol, extport, intport) m.DeleteMapping(protocol, extport, intport)
}() }()
if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil { if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil {
log.Debug(fmt.Sprintf("network port %s:%d could not be mapped: %v", protocol, intport, err)) log.Debug("Couldn't add port mapping", "err", err)
} else { } else {
log.Info(fmt.Sprintf("mapped network port %s:%d -> %d (%s) using %s", protocol, extport, intport, name, m)) log.Info("Mapped network port")
} }
for { for {
select { select {
...@@ -116,9 +117,9 @@ func Map(m Interface, c chan struct{}, protocol string, extport, intport int, na ...@@ -116,9 +117,9 @@ func Map(m Interface, c chan struct{}, protocol string, extport, intport int, na
return return
} }
case <-refresh.C: case <-refresh.C:
log.Trace(fmt.Sprintf("refresh port mapping %s:%d -> %d (%s) using %s", protocol, extport, intport, name, m)) log.Trace("Refreshing port mapping")
if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil { if err := m.AddMapping(protocol, extport, intport, name, mapTimeout); err != nil {
log.Debug(fmt.Sprintf("network port %s:%d could not be mapped: %v", protocol, intport, err)) log.Debug("Couldn't add port mapping", "err", err)
} }
refresh.Reset(mapUpdateInterval) refresh.Reset(mapUpdateInterval)
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package p2p package p2p
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
...@@ -25,6 +24,7 @@ import ( ...@@ -25,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
...@@ -64,6 +64,8 @@ type protoHandshake struct { ...@@ -64,6 +64,8 @@ type protoHandshake struct {
type Peer struct { type Peer struct {
rw *conn rw *conn
running map[string]*protoRW running map[string]*protoRW
log log.Logger
created mclock.AbsTime
wg sync.WaitGroup wg sync.WaitGroup
protoErr chan error protoErr chan error
...@@ -125,20 +127,25 @@ func newPeer(conn *conn, protocols []Protocol) *Peer { ...@@ -125,20 +127,25 @@ func newPeer(conn *conn, protocols []Protocol) *Peer {
p := &Peer{ p := &Peer{
rw: conn, rw: conn,
running: protomap, running: protomap,
created: mclock.Now(),
disc: make(chan DiscReason), disc: make(chan DiscReason),
protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
closed: make(chan struct{}), closed: make(chan struct{}),
log: log.New("id", conn.id, "conn", conn.flags),
} }
return p return p
} }
func (p *Peer) run() DiscReason { func (p *Peer) Log() log.Logger {
return p.log
}
func (p *Peer) run() (remoteRequested bool, err error) {
var ( var (
writeStart = make(chan struct{}, 1) writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1) writeErr = make(chan error, 1)
readErr = make(chan error, 1) readErr = make(chan error, 1)
reason DiscReason reason DiscReason // sent to the peer
requested bool
) )
p.wg.Add(2) p.wg.Add(2)
go p.readLoop(readErr) go p.readLoop(readErr)
...@@ -152,31 +159,26 @@ func (p *Peer) run() DiscReason { ...@@ -152,31 +159,26 @@ func (p *Peer) run() DiscReason {
loop: loop:
for { for {
select { select {
case err := <-writeErr: case err = <-writeErr:
// A write finished. Allow the next write to start if // A write finished. Allow the next write to start if
// there was no error. // there was no error.
if err != nil { if err != nil {
log.Trace(fmt.Sprintf("%v: write error: %v", p, err))
reason = DiscNetworkError reason = DiscNetworkError
break loop break loop
} }
writeStart <- struct{}{} writeStart <- struct{}{}
case err := <-readErr: case err = <-readErr:
if r, ok := err.(DiscReason); ok { if r, ok := err.(DiscReason); ok {
log.Debug(fmt.Sprintf("%v: remote requested disconnect: %v", p, r)) remoteRequested = true
requested = true
reason = r reason = r
} else { } else {
log.Trace(fmt.Sprintf("%v: read error: %v", p, err))
reason = DiscNetworkError reason = DiscNetworkError
} }
break loop break loop
case err := <-p.protoErr: case err = <-p.protoErr:
reason = discReasonForError(err) reason = discReasonForError(err)
log.Debug(fmt.Sprintf("%v: protocol error: %v (%v)", p, err, reason))
break loop break loop
case reason = <-p.disc: case err = <-p.disc:
log.Debug(fmt.Sprintf("%v: locally requested disconnect: %v", p, reason))
break loop break loop
} }
} }
...@@ -184,10 +186,7 @@ loop: ...@@ -184,10 +186,7 @@ loop:
close(p.closed) close(p.closed)
p.rw.close(reason) p.rw.close(reason)
p.wg.Wait() p.wg.Wait()
if requested { return remoteRequested, err
reason = DiscRequested
}
return reason
} }
func (p *Peer) pingLoop() { func (p *Peer) pingLoop() {
...@@ -297,14 +296,14 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) ...@@ -297,14 +296,14 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error)
proto.closed = p.closed proto.closed = p.closed
proto.wstart = writeStart proto.wstart = writeStart
proto.werr = writeErr proto.werr = writeErr
log.Trace(fmt.Sprintf("%v: Starting protocol %s/%d", p, proto.Name, proto.Version)) p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
go func() { go func() {
err := proto.Run(p, proto) err := proto.Run(p, proto)
if err == nil { if err == nil {
log.Trace(fmt.Sprintf("%v: Protocol %s/%d returned", p, proto.Name, proto.Version)) p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errors.New("protocol returned") err = errProtocolReturned
} else if err != io.EOF { } else if err != io.EOF {
log.Trace(fmt.Sprintf("%v: Protocol %s/%d error: %v", p, proto.Name, proto.Version, err)) p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
} }
p.protoErr <- err p.protoErr <- err
p.wg.Done() p.wg.Done()
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package p2p package p2p
import ( import (
"errors"
"fmt" "fmt"
) )
...@@ -51,6 +52,8 @@ func (self *peerError) Error() string { ...@@ -51,6 +52,8 @@ func (self *peerError) Error() string {
return self.message return self.message
} }
var errProtocolReturned = errors.New("protocol returned")
type DiscReason uint type DiscReason uint
const ( const (
...@@ -70,24 +73,24 @@ const ( ...@@ -70,24 +73,24 @@ const (
) )
var discReasonToString = [...]string{ var discReasonToString = [...]string{
DiscRequested: "Disconnect requested", DiscRequested: "disconnect requested",
DiscNetworkError: "Network error", DiscNetworkError: "network error",
DiscProtocolError: "Breach of protocol", DiscProtocolError: "breach of protocol",
DiscUselessPeer: "Useless peer", DiscUselessPeer: "useless peer",
DiscTooManyPeers: "Too many peers", DiscTooManyPeers: "too many peers",
DiscAlreadyConnected: "Already connected", DiscAlreadyConnected: "already connected",
DiscIncompatibleVersion: "Incompatible P2P protocol version", DiscIncompatibleVersion: "incompatible p2p protocol version",
DiscInvalidIdentity: "Invalid node identity", DiscInvalidIdentity: "invalid node identity",
DiscQuitting: "Client quitting", DiscQuitting: "client quitting",
DiscUnexpectedIdentity: "Unexpected identity", DiscUnexpectedIdentity: "unexpected identity",
DiscSelf: "Connected to self", DiscSelf: "connected to self",
DiscReadTimeout: "Read timeout", DiscReadTimeout: "read timeout",
DiscSubprotocolError: "Subprotocol error", DiscSubprotocolError: "subprotocol error",
} }
func (d DiscReason) String() string { func (d DiscReason) String() string {
if len(discReasonToString) < int(d) { if len(discReasonToString) < int(d) {
return fmt.Sprintf("Unknown Reason(%d)", d) return fmt.Sprintf("unknown disconnect reason %d", d)
} }
return discReasonToString[d] return discReasonToString[d]
} }
...@@ -100,6 +103,9 @@ func discReasonForError(err error) DiscReason { ...@@ -100,6 +103,9 @@ func discReasonForError(err error) DiscReason {
if reason, ok := err.(DiscReason); ok { if reason, ok := err.(DiscReason); ok {
return reason return reason
} }
if err == errProtocolReturned {
return DiscQuitting
}
peerError, ok := err.(*peerError) peerError, ok := err.(*peerError)
if ok { if ok {
switch peerError.code { switch peerError.code {
......
...@@ -43,7 +43,7 @@ var discard = Protocol{ ...@@ -43,7 +43,7 @@ var discard = Protocol{
}, },
} }
func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) { func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) {
fd1, fd2 := net.Pipe() fd1, fd2 := net.Pipe()
c1 := &conn{fd: fd1, transport: newTestTransport(randomID(), fd1)} c1 := &conn{fd: fd1, transport: newTestTransport(randomID(), fd1)}
c2 := &conn{fd: fd2, transport: newTestTransport(randomID(), fd2)} c2 := &conn{fd: fd2, transport: newTestTransport(randomID(), fd2)}
...@@ -53,15 +53,17 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) { ...@@ -53,15 +53,17 @@ func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) {
} }
peer := newPeer(c1, protos) peer := newPeer(c1, protos)
errc := make(chan DiscReason, 1) errc := make(chan error, 1)
go func() { errc <- peer.run() }() go func() {
_, err := peer.run()
errc <- err
}()
closer := func() { c2.close(errors.New("close func called")) } closer := func() { c2.close(errors.New("close func called")) }
return closer, c2, peer, errc return closer, c2, peer, errc
} }
func TestPeerProtoReadMsg(t *testing.T) { func TestPeerProtoReadMsg(t *testing.T) {
done := make(chan struct{})
proto := Protocol{ proto := Protocol{
Name: "a", Name: "a",
Length: 5, Length: 5,
...@@ -75,7 +77,6 @@ func TestPeerProtoReadMsg(t *testing.T) { ...@@ -75,7 +77,6 @@ func TestPeerProtoReadMsg(t *testing.T) {
if err := ExpectMsg(rw, 4, []uint{3}); err != nil { if err := ExpectMsg(rw, 4, []uint{3}); err != nil {
t.Error(err) t.Error(err)
} }
close(done)
return nil return nil
}, },
} }
...@@ -88,9 +89,10 @@ func TestPeerProtoReadMsg(t *testing.T) { ...@@ -88,9 +89,10 @@ func TestPeerProtoReadMsg(t *testing.T) {
Send(rw, baseProtocolLength+4, []uint{3}) Send(rw, baseProtocolLength+4, []uint{3})
select { select {
case <-done:
case err := <-errc: case err := <-errc:
t.Errorf("peer returned: %v", err) if err != errProtocolReturned {
t.Errorf("peer returned error: %v", err)
}
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Errorf("receive timeout") t.Errorf("receive timeout")
} }
...@@ -137,8 +139,8 @@ func TestPeerDisconnect(t *testing.T) { ...@@ -137,8 +139,8 @@ func TestPeerDisconnect(t *testing.T) {
} }
select { select {
case reason := <-disc: case reason := <-disc:
if reason != DiscRequested { if reason != DiscQuitting {
t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested) t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscQuitting)
} }
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
t.Error("peer did not return") t.Error("peer did not return")
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment