Unverified Commit 836c647b authored by Felix Lange's avatar Felix Lange Committed by GitHub

eth: unregister peer only when handler exits (#22908)

This removes the error log message that says 

    Ethereum peer removal failed ... err="peer not registered"

The error happened because removePeer was called multiple
times: once to disconnect the peer, and another time when the
handler exited. With this change, removePeer now has the sole
purpose of disconnecting the peer. Unregistering happens exactly
once, when the handler exits.
parent 4d33de9b
...@@ -287,7 +287,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { ...@@ -287,7 +287,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Error("Ethereum peer registration failed", "err", err) peer.Log().Error("Ethereum peer registration failed", "err", err)
return err return err
} }
defer h.removePeer(peer.ID()) defer h.unregisterPeer(peer.ID())
p := h.peers.peer(peer.ID()) p := h.peers.peer(peer.ID())
if p == nil { if p == nil {
...@@ -354,9 +354,16 @@ func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error ...@@ -354,9 +354,16 @@ func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error
return handler(peer) return handler(peer)
} }
// removePeer unregisters a peer from the downloader and fetchers, removes it from // removePeer requests disconnection of a peer.
// the set of tracked peers and closes the network connection to it.
func (h *handler) removePeer(id string) { func (h *handler) removePeer(id string) {
peer := h.peers.peer(id)
if peer != nil {
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}
}
// unregisterPeer removes a peer from the downloader, fetchers and main peer set.
func (h *handler) unregisterPeer(id string) {
// Create a custom logger to avoid printing the entire id // Create a custom logger to avoid printing the entire id
var logger log.Logger var logger log.Logger
if len(id) < 16 { if len(id) < 16 {
...@@ -384,8 +391,6 @@ func (h *handler) removePeer(id string) { ...@@ -384,8 +391,6 @@ func (h *handler) removePeer(id string) {
if err := h.peers.unregisterPeer(id); err != nil { if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err) logger.Error("Ethereum peer removal failed", "err", err)
} }
// Hard disconnect at the networking layer
peer.Peer.Disconnect(p2p.DiscUselessPeer)
} }
func (h *handler) Start(maxPeers int) { func (h *handler) Start(maxPeers int) {
......
...@@ -144,8 +144,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { ...@@ -144,8 +144,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
defer p2pNoFork.Close() defer p2pNoFork.Close()
defer p2pProFork.Close() defer p2pProFork.Close()
peerNoFork := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil) peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil)
peerProFork := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil) peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil)
defer peerNoFork.Close() defer peerNoFork.Close()
defer peerProFork.Close() defer peerProFork.Close()
...@@ -206,8 +206,8 @@ func testForkIDSplit(t *testing.T, protocol uint) { ...@@ -206,8 +206,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
defer p2pNoFork.Close() defer p2pNoFork.Close()
defer p2pProFork.Close() defer p2pProFork.Close()
peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil) peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil)
peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil) peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil)
defer peerNoFork.Close() defer peerNoFork.Close()
defer peerProFork.Close() defer peerProFork.Close()
...@@ -257,8 +257,8 @@ func testRecvTransactions(t *testing.T, protocol uint) { ...@@ -257,8 +257,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
defer p2pSrc.Close() defer p2pSrc.Close()
defer p2pSink.Close() defer p2pSink.Close()
src := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pSrc, handler.txpool) src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool)
sink := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pSink, handler.txpool) sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool)
defer src.Close() defer src.Close()
defer sink.Close() defer sink.Close()
...@@ -319,8 +319,8 @@ func testSendTransactions(t *testing.T, protocol uint) { ...@@ -319,8 +319,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
defer p2pSrc.Close() defer p2pSrc.Close()
defer p2pSink.Close() defer p2pSink.Close()
src := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pSrc, handler.txpool) src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool)
sink := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pSink, handler.txpool) sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool)
defer src.Close() defer src.Close()
defer sink.Close() defer sink.Close()
...@@ -407,8 +407,8 @@ func testTransactionPropagation(t *testing.T, protocol uint) { ...@@ -407,8 +407,8 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
defer sourcePipe.Close() defer sourcePipe.Close()
defer sinkPipe.Close() defer sinkPipe.Close()
sourcePeer := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{byte(i)}, "", nil), sourcePipe, source.txpool) sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool) sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool)
defer sourcePeer.Close() defer sourcePeer.Close()
defer sinkPeer.Close() defer sinkPeer.Close()
...@@ -490,6 +490,8 @@ func TestCheckpointChallenge(t *testing.T) { ...@@ -490,6 +490,8 @@ func TestCheckpointChallenge(t *testing.T) {
} }
func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpoint bool, timeout bool, empty bool, match bool, drop bool) { func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpoint bool, timeout bool, empty bool, match bool, drop bool) {
t.Parallel()
// Reduce the checkpoint handshake challenge timeout // Reduce the checkpoint handshake challenge timeout
defer func(old time.Duration) { syncChallengeTimeout = old }(syncChallengeTimeout) defer func(old time.Duration) { syncChallengeTimeout = old }(syncChallengeTimeout)
syncChallengeTimeout = 250 * time.Millisecond syncChallengeTimeout = 250 * time.Millisecond
...@@ -513,20 +515,26 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo ...@@ -513,20 +515,26 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
handler.handler.checkpointNumber = number handler.handler.checkpointNumber = number
handler.handler.checkpointHash = response.Hash() handler.handler.checkpointHash = response.Hash()
} }
// Create a challenger peer and a challenged one
// Create a challenger peer and a challenged one.
p2pLocal, p2pRemote := p2p.MsgPipe() p2pLocal, p2pRemote := p2p.MsgPipe()
defer p2pLocal.Close() defer p2pLocal.Close()
defer p2pRemote.Close() defer p2pRemote.Close()
local := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{1}, "", nil), p2pLocal, handler.txpool) local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{2}, "", nil), p2pRemote, handler.txpool) remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool)
defer local.Close() defer local.Close()
defer remote.Close() defer remote.Close()
go handler.handler.runEthPeer(local, func(peer *eth.Peer) error { handlerDone := make(chan struct{})
return eth.Handle((*ethHandler)(handler.handler), peer) go func() {
}) defer close(handlerDone)
// Run the handshake locally to avoid spinning up a remote handler handler.handler.runEthPeer(local, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
}()
// Run the handshake locally to avoid spinning up a remote handler.
var ( var (
genesis = handler.chain.Genesis() genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock() head = handler.chain.CurrentBlock()
...@@ -535,12 +543,13 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo ...@@ -535,12 +543,13 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake") t.Fatalf("failed to run protocol handshake")
} }
// Connect a new peer and check that we receive the checkpoint challenge
// Connect a new peer and check that we receive the checkpoint challenge.
if checkpoint { if checkpoint {
if err := remote.ExpectRequestHeadersByNumber(response.Number.Uint64(), 1, 0, false); err != nil { if err := remote.ExpectRequestHeadersByNumber(response.Number.Uint64(), 1, 0, false); err != nil {
t.Fatalf("challenge mismatch: %v", err) t.Fatalf("challenge mismatch: %v", err)
} }
// Create a block to reply to the challenge if no timeout is simulated // Create a block to reply to the challenge if no timeout is simulated.
if !timeout { if !timeout {
if empty { if empty {
if err := remote.SendBlockHeaders([]*types.Header{}); err != nil { if err := remote.SendBlockHeaders([]*types.Header{}); err != nil {
...@@ -557,11 +566,13 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo ...@@ -557,11 +566,13 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
} }
} }
} }
// Wait until the test timeout passes to ensure proper cleanup // Wait until the test timeout passes to ensure proper cleanup
time.Sleep(syncChallengeTimeout + 300*time.Millisecond) time.Sleep(syncChallengeTimeout + 300*time.Millisecond)
// Verify that the remote peer is maintained or dropped // Verify that the remote peer is maintained or dropped.
if drop { if drop {
<-handlerDone
if peers := handler.handler.peers.len(); peers != 0 { if peers := handler.handler.peers.len(); peers != 0 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0) t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
} }
...@@ -608,8 +619,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { ...@@ -608,8 +619,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
defer sourcePipe.Close() defer sourcePipe.Close()
defer sinkPipe.Close() defer sinkPipe.Close()
sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{byte(i)}, "", nil), sourcePipe, nil) sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil)
sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, nil) sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil)
defer sourcePeer.Close() defer sourcePeer.Close()
defer sinkPeer.Close() defer sinkPeer.Close()
...@@ -676,8 +687,8 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) { ...@@ -676,8 +687,8 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
defer p2pSrc.Close() defer p2pSrc.Close()
defer p2pSink.Close() defer p2pSink.Close()
src := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pSrc, source.txpool) src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, source.txpool)
sink := eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pSink, source.txpool) sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, source.txpool)
defer src.Close() defer src.Close()
defer sink.Close() defer sink.Close()
......
...@@ -115,7 +115,8 @@ type Peer struct { ...@@ -115,7 +115,8 @@ type Peer struct {
disc chan DiscReason disc chan DiscReason
// events receives message send / receive events if set // events receives message send / receive events if set
events *event.Feed events *event.Feed
testPipe *MsgPipeRW // for testing
} }
// NewPeer returns a peer for testing purposes. // NewPeer returns a peer for testing purposes.
...@@ -128,6 +129,15 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer { ...@@ -128,6 +129,15 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer {
return peer return peer
} }
// NewPeerPipe creates a peer for testing purposes.
// The message pipe given as the last parameter is closed when
// Disconnect is called on the peer.
func NewPeerPipe(id enode.ID, name string, caps []Cap, pipe *MsgPipeRW) *Peer {
p := NewPeer(id, name, caps)
p.testPipe = pipe
return p
}
// ID returns the node's public key. // ID returns the node's public key.
func (p *Peer) ID() enode.ID { func (p *Peer) ID() enode.ID {
return p.rw.node.ID() return p.rw.node.ID()
...@@ -185,6 +195,10 @@ func (p *Peer) LocalAddr() net.Addr { ...@@ -185,6 +195,10 @@ func (p *Peer) LocalAddr() net.Addr {
// Disconnect terminates the peer connection with the given reason. // Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed. // It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) { func (p *Peer) Disconnect(reason DiscReason) {
if p.testPipe != nil {
p.testPipe.Close()
}
select { select {
case p.disc <- reason: case p.disc <- reason:
case <-p.closed: case <-p.closed:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment