Unverified Commit 90caa2ca authored by Felix Lange's avatar Felix Lange Committed by GitHub

p2p: new dial scheduler (#20592)

* p2p: new dial scheduler

This change replaces the peer-to-peer dial scheduler with a new and
improved implementation. The new code is better than the previous
implementation in two key aspects:

- The time between discovery of a node and dialing that node is
  significantly lower in the new version. The old dialState kept
  a buffer of nodes and launched a task to refill it whenever the buffer
  became empty. This worked well with the discovery interface we used to
  have, but doesn't really work with the new iterator-based discovery
  API.

- Selection of static dial candidates (created by Server.AddPeer or
  through static-nodes.json) performs much better for large amounts of
  static peers. Connections to static nodes are now limited like dynanic
  dials and can no longer overstep MaxPeers or the dial ratio.

* p2p/simulations/adapters: adapt to new NodeDialer interface

* p2p: re-add check for self in checkDial

* p2p: remove peersetCh

* p2p: allow static dials when discovery is disabled

* p2p: add test for dialScheduler.removeStatic

* p2p: remove blank line

* p2p: fix documentation of maxDialPeers

* p2p: change "ok" to "added" in static node log

* p2p: improve dialTask docs

Also increase log level for "Can't resolve node"

* p2p: ensure dial resolver is truly nil without discovery

* p2p: add "looking for peers" log message

* p2p: clean up Server.run comments

* p2p: fix maxDialedConns for maxpeers < dialRatio

Always allocate at least one dial slot unless dialing is disabled using
NoDial or MaxPeers == 0. Most importantly, this fixes MaxPeers == 1 to
dedicate the sole slot to dialing instead of listening.

* p2p: fix RemovePeer to disconnect the peer again

Also make RemovePeer synchronous and add a test.

* p2p: remove "Connection set up" log message

* p2p: clean up connection logging

We previously logged outgoing connection failures up to three times.

- in SetupConn() as "Setting up connection failed addr=..."
- in setupConn() with an error-specific message and "id=... addr=..."
- in dial() as "Dial error task=..."

This commit ensures a single log message is emitted per failure and adds
"id=... addr=... conn=..." everywhere (id= omitted when the ID isn't
known yet).

Also avoid printing a log message when a static dial fails but can't be
resolved because discv4 is disabled. The light client hit this case all
the time, increasing the message count to four lines per failed
connection.

* p2p: document that RemovePeer blocks
parent 5f2002bb
This diff is collapsed.
This diff is collapsed.
......@@ -17,15 +17,20 @@
package p2p
import (
"encoding/binary"
"errors"
"fmt"
"math/rand"
"net"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
)
var discard = Protocol{
......@@ -45,10 +50,45 @@ var discard = Protocol{
},
}
// uintID encodes i into a node ID.
func uintID(i uint16) enode.ID {
var id enode.ID
binary.BigEndian.PutUint16(id[:], i)
return id
}
// newNode creates a node record with the given address.
func newNode(id enode.ID, addr string) *enode.Node {
var r enr.Record
if addr != "" {
// Set the port if present.
if strings.Contains(addr, ":") {
hs, ps, err := net.SplitHostPort(addr)
if err != nil {
panic(fmt.Errorf("invalid address %q", addr))
}
port, err := strconv.Atoi(ps)
if err != nil {
panic(fmt.Errorf("invalid port in %q", addr))
}
r.Set(enr.TCP(port))
r.Set(enr.UDP(port))
addr = hs
}
// Set the IP.
ip := net.ParseIP(addr)
if ip == nil {
panic(fmt.Errorf("invalid IP %q", addr))
}
r.Set(enr.IP(ip))
}
return enode.SignNull(&r, id)
}
func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) {
fd1, fd2 := net.Pipe()
c1 := &conn{fd: fd1, node: newNode(randomID(), nil), transport: newTestTransport(&newkey().PublicKey, fd1)}
c2 := &conn{fd: fd2, node: newNode(randomID(), nil), transport: newTestTransport(&newkey().PublicKey, fd2)}
c1 := &conn{fd: fd1, node: newNode(randomID(), ""), transport: newTestTransport(&newkey().PublicKey, fd1)}
c2 := &conn{fd: fd2, node: newNode(randomID(), ""), transport: newTestTransport(&newkey().PublicKey, fd2)}
for _, p := range protos {
c1.caps = append(c1.caps, p.cap())
c2.caps = append(c2.caps, p.cap())
......
This diff is collapsed.
......@@ -34,10 +34,6 @@ import (
"golang.org/x/crypto/sha3"
)
// func init() {
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
// }
type testTransport struct {
rpub *ecdsa.PublicKey
*rlpx
......@@ -75,6 +71,7 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *
Name: "test",
MaxPeers: 10,
ListenAddr: "127.0.0.1:0",
NoDiscovery: true,
PrivateKey: newkey(),
Logger: testlog.Logger(t, log.LvlTrace),
}
......@@ -131,11 +128,10 @@ func TestServerDial(t *testing.T) {
t.Fatalf("could not setup listener: %v", err)
}
defer listener.Close()
accepted := make(chan net.Conn)
accepted := make(chan net.Conn, 1)
go func() {
conn, err := listener.Accept()
if err != nil {
t.Error("accept error:", err)
return
}
accepted <- conn
......@@ -205,155 +201,38 @@ func TestServerDial(t *testing.T) {
}
}
// This test checks that tasks generated by dialstate are
// actually executed and taskdone is called for them.
func TestServerTaskScheduling(t *testing.T) {
var (
done = make(chan *testTask)
quit, returned = make(chan struct{}), make(chan struct{})
tc = 0
tg = taskgen{
newFunc: func(running int, peers map[enode.ID]*Peer) []task {
tc++
return []task{&testTask{index: tc - 1}}
},
doneFunc: func(t task) {
select {
case done <- t.(*testTask):
case <-quit:
}
},
}
)
// The Server in this test isn't actually running
// because we're only interested in what run does.
db, _ := enode.OpenDB("")
srv := &Server{
Config: Config{MaxPeers: 10},
localnode: enode.NewLocalNode(db, newkey()),
nodedb: db,
discmix: enode.NewFairMix(0),
quit: make(chan struct{}),
running: true,
log: log.New(),
}
srv.loopWG.Add(1)
go func() {
srv.run(tg)
close(returned)
}()
var gotdone []*testTask
for i := 0; i < 100; i++ {
gotdone = append(gotdone, <-done)
}
for i, task := range gotdone {
if task.index != i {
t.Errorf("task %d has wrong index, got %d", i, task.index)
break
}
if !task.called {
t.Errorf("task %d was not called", i)
break
}
}
close(quit)
srv.Stop()
select {
case <-returned:
case <-time.After(500 * time.Millisecond):
t.Error("Server.run did not return within 500ms")
}
}
// This test checks that Server doesn't drop tasks,
// even if newTasks returns more than the maximum number of tasks.
func TestServerManyTasks(t *testing.T) {
alltasks := make([]task, 300)
for i := range alltasks {
alltasks[i] = &testTask{index: i}
}
var (
db, _ = enode.OpenDB("")
srv = &Server{
quit: make(chan struct{}),
localnode: enode.NewLocalNode(db, newkey()),
nodedb: db,
running: true,
log: log.New(),
discmix: enode.NewFairMix(0),
}
done = make(chan *testTask)
start, end = 0, 0
)
defer srv.Stop()
srv.loopWG.Add(1)
go srv.run(taskgen{
newFunc: func(running int, peers map[enode.ID]*Peer) []task {
start, end = end, end+maxActiveDialTasks+10
if end > len(alltasks) {
end = len(alltasks)
}
return alltasks[start:end]
},
doneFunc: func(tt task) {
done <- tt.(*testTask)
},
})
// This test checks that RemovePeer disconnects the peer if it is connected.
func TestServerRemovePeerDisconnect(t *testing.T) {
srv1 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
NoDiscovery: true,
Logger: testlog.Logger(t, log.LvlTrace).New("server", "1"),
}}
srv2 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
NoDiscovery: true,
NoDial: true,
ListenAddr: "127.0.0.1:0",
Logger: testlog.Logger(t, log.LvlTrace).New("server", "2"),
}}
srv1.Start()
defer srv1.Stop()
srv2.Start()
defer srv2.Stop()
doneset := make(map[int]bool)
timeout := time.After(2 * time.Second)
for len(doneset) < len(alltasks) {
select {
case tt := <-done:
if doneset[tt.index] {
t.Errorf("task %d got done more than once", tt.index)
} else {
doneset[tt.index] = true
if !syncAddPeer(srv1, srv2.Self()) {
t.Fatal("peer not connected")
}
case <-timeout:
t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks))
for i := 0; i < len(alltasks); i++ {
if !doneset[i] {
t.Logf("task %d not done", i)
}
}
return
srv1.RemovePeer(srv2.Self())
if srv1.PeerCount() > 0 {
t.Fatal("removed peer still connected")
}
}
}
type taskgen struct {
newFunc func(running int, peers map[enode.ID]*Peer) []task
doneFunc func(task)
}
func (tg taskgen) newTasks(running int, peers map[enode.ID]*Peer, now time.Time) []task {
return tg.newFunc(running, peers)
}
func (tg taskgen) taskDone(t task, now time.Time) {
tg.doneFunc(t)
}
func (tg taskgen) addStatic(*enode.Node) {
}
func (tg taskgen) removeStatic(*enode.Node) {
}
type testTask struct {
index int
called bool
}
func (t *testTask) Do(srv *Server) {
t.called = true
}
// This test checks that connections are disconnected
// just after the encryption handshake when the server is
// at capacity. Trusted connections should still be accepted.
// This test checks that connections are disconnected just after the encryption handshake
// when the server is at capacity. Trusted connections should still be accepted.
func TestServerAtCap(t *testing.T) {
trustedNode := newkey()
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
......@@ -363,7 +242,8 @@ func TestServerAtCap(t *testing.T) {
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
TrustedNodes: []*enode.Node{newNode(trustedID, nil)},
TrustedNodes: []*enode.Node{newNode(trustedID, "")},
Logger: testlog.Logger(t, log.LvlTrace),
},
}
if err := srv.Start(); err != nil {
......@@ -401,14 +281,14 @@ func TestServerAtCap(t *testing.T) {
}
// Remove from trusted set and try again
srv.RemoveTrustedPeer(newNode(trustedID, nil))
srv.RemoveTrustedPeer(newNode(trustedID, ""))
c = newconn(trustedID)
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}
// Add anotherID to trusted set and try again
srv.AddTrustedPeer(newNode(anotherID, nil))
srv.AddTrustedPeer(newNode(anotherID, ""))
c = newconn(anotherID)
if err := srv.checkpoint(c, srv.checkpointPostHandshake); err != nil {
t.Error("unexpected error for trusted conn @posthandshake:", err)
......@@ -439,9 +319,9 @@ func TestServerPeerLimits(t *testing.T) {
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Logger: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn) transport { return tp },
log: log.New(),
}
if err := srv.Start(); err != nil {
t.Fatalf("couldn't start server: %v", err)
......@@ -724,3 +604,23 @@ func (l *fakeAddrListener) Accept() (net.Conn, error) {
func (c *fakeAddrConn) RemoteAddr() net.Addr {
return c.remoteAddr
}
func syncAddPeer(srv *Server, node *enode.Node) bool {
var (
ch = make(chan *PeerEvent)
sub = srv.SubscribeEvents(ch)
timeout = time.After(2 * time.Second)
)
defer sub.Unsubscribe()
srv.AddPeer(node)
for {
select {
case ev := <-ch:
if ev.Type == PeerEventTypeAdd && ev.Peer == node.ID() {
return true
}
case <-timeout:
return false
}
}
}
......@@ -17,6 +17,7 @@
package adapters
import (
"context"
"errors"
"fmt"
"math"
......@@ -126,7 +127,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
// Dial implements the p2p.NodeDialer interface by connecting to the node using
// an in-memory net.Pipe
func (s *SimAdapter) Dial(dest *enode.Node) (conn net.Conn, err error) {
func (s *SimAdapter) Dial(ctx context.Context, dest *enode.Node) (conn net.Conn, err error) {
node, ok := s.GetNode(dest.ID())
if !ok {
return nil, fmt.Errorf("unknown node: %s", dest.ID())
......
......@@ -18,7 +18,8 @@ package p2p
import (
"container/heap"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
// expHeap tracks strings and their expiry time.
......@@ -27,16 +28,16 @@ type expHeap []expItem
// expItem is an entry in addrHistory.
type expItem struct {
item string
exp time.Time
exp mclock.AbsTime
}
// nextExpiry returns the next expiry time.
func (h *expHeap) nextExpiry() time.Time {
func (h *expHeap) nextExpiry() mclock.AbsTime {
return (*h)[0].exp
}
// add adds an item and sets its expiry time.
func (h *expHeap) add(item string, exp time.Time) {
func (h *expHeap) add(item string, exp mclock.AbsTime) {
heap.Push(h, expItem{item, exp})
}
......@@ -51,15 +52,18 @@ func (h expHeap) contains(item string) bool {
}
// expire removes items with expiry time before 'now'.
func (h *expHeap) expire(now time.Time) {
for h.Len() > 0 && h.nextExpiry().Before(now) {
heap.Pop(h)
func (h *expHeap) expire(now mclock.AbsTime, onExp func(string)) {
for h.Len() > 0 && h.nextExpiry() < now {
item := heap.Pop(h)
if onExp != nil {
onExp(item.(expItem).item)
}
}
}
// heap.Interface boilerplate
func (h expHeap) Len() int { return len(h) }
func (h expHeap) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
func (h expHeap) Less(i, j int) bool { return h[i].exp < h[j].exp }
func (h expHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *expHeap) Push(x interface{}) { *h = append(*h, x.(expItem)) }
func (h *expHeap) Pop() interface{} {
......
......@@ -19,30 +19,32 @@ package p2p
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
func TestExpHeap(t *testing.T) {
var h expHeap
var (
basetime = time.Unix(4000, 0)
basetime = mclock.AbsTime(10)
exptimeA = basetime.Add(2 * time.Second)
exptimeB = basetime.Add(3 * time.Second)
exptimeC = basetime.Add(4 * time.Second)
)
h.add("a", exptimeA)
h.add("b", exptimeB)
h.add("a", exptimeA)
h.add("c", exptimeC)
if !h.nextExpiry().Equal(exptimeA) {
if h.nextExpiry() != exptimeA {
t.Fatal("wrong nextExpiry")
}
if !h.contains("a") || !h.contains("b") || !h.contains("c") {
t.Fatal("heap doesn't contain all live items")
}
h.expire(exptimeA.Add(1))
if !h.nextExpiry().Equal(exptimeB) {
h.expire(exptimeA.Add(1), nil)
if h.nextExpiry() != exptimeB {
t.Fatal("wrong nextExpiry")
}
if h.contains("a") {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment