Commit 44ea0da2 authored by Péter Szilágyi's avatar Péter Szilágyi Committed by GitHub

Merge pull request #2861 from karalabe/track-peer-heads-properly

eth, eth/downloader: better remote head tracking
parents 4f4e1026 1dd27208
...@@ -236,12 +236,12 @@ func (d *Downloader) Synchronising() bool { ...@@ -236,12 +236,12 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from. // used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
glog.V(logger.Detail).Infoln("Registering peer", id) glog.V(logger.Detail).Infoln("Registering peer", id)
if err := d.peers.Register(newPeer(id, version, head, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err) glog.V(logger.Error).Infoln("Register failed:", err)
return err return err
} }
...@@ -501,7 +501,8 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { ...@@ -501,7 +501,8 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p) glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
// Request the advertised remote head block and wait for the response // Request the advertised remote head block and wait for the response
go p.getRelHeaders(p.head, 1, 0, false) head, _ := p.currentHead()
go p.getRelHeaders(head, 1, 0, false)
timeout := time.After(d.requestTTL()) timeout := time.After(d.requestTTL())
for { for {
......
...@@ -400,11 +400,11 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha ...@@ -400,11 +400,11 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
var err error var err error
switch version { switch version {
case 62: case 62:
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
case 63: case 63:
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
case 64: case 64:
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
} }
if err == nil { if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy) // Assign the owned hashes, headers and blocks to the peer (deep copy)
...@@ -463,6 +463,17 @@ func (dl *downloadTester) dropPeer(id string) { ...@@ -463,6 +463,17 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id) dl.downloader.UnregisterPeer(id)
} }
// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) {
return func() (common.Hash, *big.Int) {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.peerHashes[id][0], nil
}
}
// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed // peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed
// origin; associated with a particular peer in the download tester. The returned // origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer. // function can be used to retrieve batches of headers from the particular peer.
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"math/big"
"sort" "sort"
"strings" "strings"
"sync" "sync"
...@@ -37,6 +38,9 @@ const ( ...@@ -37,6 +38,9 @@ const (
measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
) )
// Head hash and total difficulty retriever for
type currentHeadRetrievalFn func() (common.Hash, *big.Int)
// Block header and body fetchers belonging to eth/62 and above // Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
...@@ -53,7 +57,6 @@ var ( ...@@ -53,7 +57,6 @@ var (
// peer represents an active peer from which hashes and blocks are retrieved. // peer represents an active peer from which hashes and blocks are retrieved.
type peer struct { type peer struct {
id string // Unique identifier of the peer id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
...@@ -74,6 +77,8 @@ type peer struct { ...@@ -74,6 +77,8 @@ type peer struct {
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer
getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
...@@ -87,14 +92,14 @@ type peer struct { ...@@ -87,14 +92,14 @@ type peer struct {
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms. // mechanisms.
func newPeer(id string, version int, head common.Hash, func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head,
lacking: make(map[common.Hash]struct{}), lacking: make(map[common.Hash]struct{}),
currentHead: currentHead,
getRelHeaders: getRelHeaders, getRelHeaders: getRelHeaders,
getAbsHeaders: getAbsHeaders, getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies, getBlockBodies: getBlockBodies,
......
...@@ -272,11 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error { ...@@ -272,11 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
p.RequestHeadersByHash, p.RequestHeadersByNumber,
p.RequestBodies, p.RequestReceipts, p.RequestNodeData,
)
if err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing
...@@ -413,7 +409,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -413,7 +409,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// If we already have a DAO header, we can check the peer's TD against it. If // If we already have a DAO header, we can check the peer's TD against it. If
// the peer's ahead of this, it too must have a reply to the DAO check // the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if p.Td().Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false verifyDAO = false
} }
} }
...@@ -619,7 +615,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -619,7 +615,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Mark the hashes as present at the remote node // Mark the hashes as present at the remote node
for _, block := range announces { for _, block := range announces {
p.MarkBlock(block.Hash) p.MarkBlock(block.Hash)
p.SetHead(block.Hash)
} }
// Schedule all the unknown hashes for retrieval // Schedule all the unknown hashes for retrieval
unknown := make([]announce, 0, len(announces)) unknown := make([]announce, 0, len(announces))
...@@ -646,16 +641,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -646,16 +641,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Mark the peer as owning the block and schedule it for import // Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash()) p.MarkBlock(request.Block.Hash())
p.SetHead(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block) pm.fetcher.Enqueue(p.id, request.Block)
// Update the peers total difficulty if needed, schedule a download if gapped // Assuming the block is importable by the peer, but possibly not yet done so,
if request.TD.Cmp(p.Td()) > 0 { // calculate the head hash and TD that the peer truly must have.
p.SetTd(request.TD) var (
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
// Update the peers total difficulty if better than the previous
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
// a singe block (as the true TD is below the propagated block), however this
// scenario should easily be covered by the fetcher.
currentBlock := pm.blockchain.CurrentBlock() currentBlock := pm.blockchain.CurrentBlock()
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 {
go pm.synchronise(p) go pm.synchronise(p)
} }
} }
......
...@@ -84,43 +84,31 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { ...@@ -84,43 +84,31 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
// Info gathers and returns a collection of metadata known about a peer. // Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo { func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
return &PeerInfo{ return &PeerInfo{
Version: p.version, Version: p.version,
Difficulty: p.Td(), Difficulty: td,
Head: fmt.Sprintf("%x", p.Head()), Head: hash.Hex(),
} }
} }
// Head retrieves a copy of the current head (most recent) hash of the peer. // Head retrieves a copy of the current head hash and total difficulty of the
func (p *peer) Head() (hash common.Hash) { // peer.
func (p *peer) Head() (hash common.Hash, td *big.Int) {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
copy(hash[:], p.head[:]) copy(hash[:], p.head[:])
return hash return hash, new(big.Int).Set(p.td)
} }
// SetHead updates the head (most recent) hash of the peer. // SetHead updates the head hash and total difficulty of the peer.
func (p *peer) SetHead(hash common.Hash) { func (p *peer) SetHead(hash common.Hash, td *big.Int) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
copy(p.head[:], hash[:]) copy(p.head[:], hash[:])
}
// Td retrieves the current total difficulty of a peer.
func (p *peer) Td() *big.Int {
p.lock.RLock()
defer p.lock.RUnlock()
return new(big.Int).Set(p.td)
}
// SetTd updates the current total difficulty of a peer.
func (p *peer) SetTd(td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()
p.td.Set(td) p.td.Set(td)
} }
...@@ -411,7 +399,7 @@ func (ps *peerSet) BestPeer() *peer { ...@@ -411,7 +399,7 @@ func (ps *peerSet) BestPeer() *peer {
bestTd *big.Int bestTd *big.Int
) )
for _, p := range ps.peers { for _, p := range ps.peers {
if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 { if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
bestPeer, bestTd = p, td bestPeer, bestTd = p, td
} }
} }
......
...@@ -161,10 +161,12 @@ func (pm *ProtocolManager) synchronise(peer *peer) { ...@@ -161,10 +161,12 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if peer == nil { if peer == nil {
return return
} }
// Make sure the peer's TD is higher than our own. If not drop. // Make sure the peer's TD is higher than our own
currentBlock := pm.blockchain.CurrentBlock() currentBlock := pm.blockchain.CurrentBlock()
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
if peer.Td().Cmp(td) <= 0 {
pHead, pTd := peer.Head()
if pTd.Cmp(td) <= 0 {
return return
} }
// Otherwise try to sync with the downloader // Otherwise try to sync with the downloader
...@@ -172,7 +174,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { ...@@ -172,7 +174,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if atomic.LoadUint32(&pm.fastSync) == 1 { if atomic.LoadUint32(&pm.fastSync) == 1 {
mode = downloader.FastSync mode = downloader.FastSync
} }
if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil { if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return return
} }
atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment