Commit 6c41e675 authored by Felix Lange's avatar Felix Lange

p2p: resolve incomplete dial targets

This change makes it possible to add peers without providing their IP
address. The endpoint of the target node is resolved using the discovery
protocol.
parent 04c6369a
...@@ -36,6 +36,10 @@ const ( ...@@ -36,6 +36,10 @@ const (
// Discovery lookups are throttled and can only run // Discovery lookups are throttled and can only run
// once every few seconds. // once every few seconds.
lookupInterval = 4 * time.Second lookupInterval = 4 * time.Second
// Endpoint resolution is throttled with bounded backoff.
initialResolveDelay = 60 * time.Second
maxResolveDelay = time.Hour
) )
// dialstate schedules dials and discovery lookups. // dialstate schedules dials and discovery lookups.
...@@ -46,17 +50,17 @@ type dialstate struct { ...@@ -46,17 +50,17 @@ type dialstate struct {
ntab discoverTable ntab discoverTable
lookupRunning bool lookupRunning bool
dialing map[discover.NodeID]connFlag dialing map[discover.NodeID]connFlag
lookupBuf []*discover.Node // current discovery lookup results lookupBuf []*discover.Node // current discovery lookup results
randomNodes []*discover.Node // filled from Table randomNodes []*discover.Node // filled from Table
static map[discover.NodeID]*discover.Node static map[discover.NodeID]*dialTask
hist *dialHistory hist *dialHistory
} }
type discoverTable interface { type discoverTable interface {
Self() *discover.Node Self() *discover.Node
Close() Close()
Resolve(target discover.NodeID) *discover.Node
Lookup(target discover.NodeID) []*discover.Node Lookup(target discover.NodeID) []*discover.Node
ReadRandomNodes([]*discover.Node) int ReadRandomNodes([]*discover.Node) int
} }
...@@ -74,10 +78,13 @@ type task interface { ...@@ -74,10 +78,13 @@ type task interface {
Do(*Server) Do(*Server)
} }
// A dialTask is generated for each node that is dialed. // A dialTask is generated for each node that is dialed. Its
// fields cannot be accessed while the task is running.
type dialTask struct { type dialTask struct {
flags connFlag flags connFlag
dest *discover.Node dest *discover.Node
lastResolved time.Time
resolveDelay time.Duration
} }
// discoverTask runs discovery table operations. // discoverTask runs discovery table operations.
...@@ -97,26 +104,31 @@ func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dial ...@@ -97,26 +104,31 @@ func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dial
s := &dialstate{ s := &dialstate{
maxDynDials: maxdyn, maxDynDials: maxdyn,
ntab: ntab, ntab: ntab,
static: make(map[discover.NodeID]*discover.Node), static: make(map[discover.NodeID]*dialTask),
dialing: make(map[discover.NodeID]connFlag), dialing: make(map[discover.NodeID]connFlag),
randomNodes: make([]*discover.Node, maxdyn/2), randomNodes: make([]*discover.Node, maxdyn/2),
hist: new(dialHistory), hist: new(dialHistory),
} }
for _, n := range static { for _, n := range static {
s.static[n.ID] = n s.addStatic(n)
} }
return s return s
} }
func (s *dialstate) addStatic(n *discover.Node) { func (s *dialstate) addStatic(n *discover.Node) {
s.static[n.ID] = n // This overwites the task instead of updating an existing
// entry, giving users the opportunity to force a resolve operation.
s.static[n.ID] = &dialTask{flags: staticDialedConn, dest: n}
} }
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
var newtasks []task var newtasks []task
isDialing := func(id discover.NodeID) bool {
_, found := s.dialing[id]
return found || peers[id] != nil || s.hist.contains(id)
}
addDial := func(flag connFlag, n *discover.Node) bool { addDial := func(flag connFlag, n *discover.Node) bool {
_, dialing := s.dialing[n.ID] if isDialing(n.ID) {
if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
return false return false
} }
s.dialing[n.ID] = flag s.dialing[n.ID] = flag
...@@ -141,8 +153,11 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now ...@@ -141,8 +153,11 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now
s.hist.expire(now) s.hist.expire(now)
// Create dials for static nodes if they are not connected. // Create dials for static nodes if they are not connected.
for _, n := range s.static { for id, t := range s.static {
addDial(staticDialedConn, n) if !isDialing(id) {
s.dialing[id] = t.flags
newtasks = append(newtasks, t)
}
} }
// Use random nodes from the table for half of the necessary // Use random nodes from the table for half of the necessary
...@@ -194,17 +209,68 @@ func (s *dialstate) taskDone(t task, now time.Time) { ...@@ -194,17 +209,68 @@ func (s *dialstate) taskDone(t task, now time.Time) {
} }
func (t *dialTask) Do(srv *Server) { func (t *dialTask) Do(srv *Server) {
addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)} if t.dest.Incomplete() {
glog.V(logger.Debug).Infof("dialing %v\n", t.dest) if !t.resolve(srv) {
return
}
}
success := t.dial(srv, t.dest)
// Try resolving the ID of static nodes if dialing failed.
if !success && t.flags&staticDialedConn != 0 {
if t.resolve(srv) {
t.dial(srv, t.dest)
}
}
}
// resolve attempts to find the current endpoint for the destination
// using discovery.
//
// Resolve operations are throttled with backoff to avoid flooding the
// discovery network with useless queries for nodes that don't exist.
// The backoff delay resets when the node is found.
func (t *dialTask) resolve(srv *Server) bool {
if srv.ntab == nil {
glog.V(logger.Debug).Infof("can't resolve node %x: discovery is disabled", t.dest.ID[:6])
return false
}
if t.resolveDelay == 0 {
t.resolveDelay = initialResolveDelay
}
if time.Since(t.lastResolved) < t.resolveDelay {
return false
}
resolved := srv.ntab.Resolve(t.dest.ID)
t.lastResolved = time.Now()
if resolved == nil {
t.resolveDelay *= 2
if t.resolveDelay > maxResolveDelay {
t.resolveDelay = maxResolveDelay
}
glog.V(logger.Debug).Infof("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay)
return false
}
// The node was found.
t.resolveDelay = initialResolveDelay
t.dest = resolved
glog.V(logger.Debug).Infof("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP)
return true
}
// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
glog.V(logger.Debug).Infof("dial tcp %v (%x)\n", addr, dest.ID[:6])
fd, err := srv.Dialer.Dial("tcp", addr.String()) fd, err := srv.Dialer.Dial("tcp", addr.String())
if err != nil { if err != nil {
glog.V(logger.Detail).Infof("dial error: %v", err) glog.V(logger.Detail).Infof("%v", err)
return return false
} }
mfd := newMeteredConn(fd, false) mfd := newMeteredConn(fd, false)
srv.setupConn(mfd, t.flags, dest)
srv.setupConn(mfd, t.flags, t.dest) return true
} }
func (t *dialTask) String() string { func (t *dialTask) String() string {
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP) return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
} }
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment