Commit 4b8860a7 authored by Nick Johnson's avatar Nick Johnson Committed by GitHub

Merge pull request #14723 from Arachnid/downloadrefactor

Refactor downloader to use interfaces instead of callbacks
parents f25486c3 fe13949d
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -41,7 +41,7 @@ var ( ...@@ -41,7 +41,7 @@ var (
// fetchRequest is a currently running data retrieval operation. // fetchRequest is a currently running data retrieval operation.
type fetchRequest struct { type fetchRequest struct {
Peer *peer // Peer to which the request was sent Peer *peerConnection // Peer to which the request was sent
From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order Headers []*types.Header // [eth/62] Requested headers, sorted by request order
...@@ -391,7 +391,7 @@ func (q *queue) countProcessableItems() int { ...@@ -391,7 +391,7 @@ func (q *queue) countProcessableItems() int {
// ReserveHeaders reserves a set of headers for the given peer, skipping any // ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches. // previously failed batches.
func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -432,7 +432,7 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { ...@@ -432,7 +432,7 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
// ReserveBodies reserves a set of body fetches for the given peer, skipping any // ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also // previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing. // returns a flag whether empty blocks were queued requiring processing.
func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool { isNoop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
} }
...@@ -445,7 +445,7 @@ func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { ...@@ -445,7 +445,7 @@ func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
// any previously failed downloads. Beside the next batch of needed fetches, it // any previously failed downloads. Beside the next batch of needed fetches, it
// also returns a flag whether empty receipts were queued requiring importing. // also returns a flag whether empty receipts were queued requiring importing.
func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) { func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool { isNoop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash return header.ReceiptHash == types.EmptyRootHash
} }
...@@ -462,7 +462,7 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) ...@@ -462,7 +462,7 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error)
// Note, this method expects the queue lock to be already held for writing. The // Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need // reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway. // to access the queue, so they already need a lock anyway.
func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
// Short circuit if the pool has been depleted, or if the peer's already // Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state) // downloading something (sanity check not to corrupt state)
......
...@@ -37,7 +37,7 @@ type stateReq struct { ...@@ -37,7 +37,7 @@ type stateReq struct {
tasks map[common.Hash]*stateTask // Download tasks to track previous attempts tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires timer *time.Timer // Timer to fire when the RTT timeout expires
peer *peer // Peer that we're requesting from peer *peerConnection // Peer that we're requesting from
response [][]byte // Response data of the peer (nil for timeouts) response [][]byte // Response data of the peer (nil for timeouts)
} }
...@@ -246,7 +246,7 @@ func (s *stateSync) Cancel() error { ...@@ -246,7 +246,7 @@ func (s *stateSync) Cancel() error {
// and timeouts. // and timeouts.
func (s *stateSync) loop() error { func (s *stateSync) loop() error {
// Listen for new peer events to assign tasks to them // Listen for new peer events to assign tasks to them
newPeer := make(chan *peer, 1024) newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer) peerSub := s.d.peers.SubscribeNewPeers(newPeer)
defer peerSub.Unsubscribe() defer peerSub.Unsubscribe()
......
...@@ -18,51 +18,10 @@ package downloader ...@@ -18,51 +18,10 @@ package downloader
import ( import (
"fmt" "fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
// headerCheckFn is a callback type for verifying a header's presence in the local chain.
type headerCheckFn func(common.Hash) bool
// blockAndStateCheckFn is a callback type for verifying block and associated states' presence in the local chain.
type blockAndStateCheckFn func(common.Hash) bool
// headerRetrievalFn is a callback type for retrieving a header from the local chain.
type headerRetrievalFn func(common.Hash) *types.Header
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
type headHeaderRetrievalFn func() *types.Header
// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
type headBlockRetrievalFn func() *types.Block
// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
type headFastBlockRetrievalFn func() *types.Block
// headBlockCommitterFn is a callback for directly committing the head block to a certain entity.
type headBlockCommitterFn func(common.Hash) error
// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
type tdRetrievalFn func(common.Hash) *big.Int
// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
type headerChainInsertFn func([]*types.Header, int) (int, error)
// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
type blockChainInsertFn func(types.Blocks) (int, error)
// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
// chainRollbackFn is a callback type to remove a few recently added elements from the local chain.
type chainRollbackFn func([]common.Hash)
// peerDropFn is a callback type for dropping a peer detected as malicious. // peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string) type peerDropFn func(id string)
......
...@@ -157,10 +157,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne ...@@ -157,10 +157,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
return nil, errIncompatibleConfig return nil, errIncompatibleConfig
} }
// Construct the different synchronisation mechanisms // Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeaderByHash, manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
blockchain.GetBlockByHash, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead,
blockchain.GetTdByHash, blockchain.InsertHeaderChain, manager.blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback,
manager.removePeer)
validator := func(header *types.Header) error { validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true) return engine.VerifyHeader(blockchain, header, true)
...@@ -268,7 +265,7 @@ func (pm *ProtocolManager) handle(p *peer) error { ...@@ -268,7 +265,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing
......
...@@ -206,9 +206,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network ...@@ -206,9 +206,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
} }
if lightSync { if lightSync {
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
manager.peers.notify((*downloaderPeerNotify)(manager)) manager.peers.notify((*downloaderPeerNotify)(manager))
manager.fetcher = newLightFetcher(manager) manager.fetcher = newLightFetcher(manager)
} }
...@@ -840,10 +838,16 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo { ...@@ -840,10 +838,16 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
// downloaderPeerNotify implements peerSetNotify // downloaderPeerNotify implements peerSetNotify
type downloaderPeerNotify ProtocolManager type downloaderPeerNotify ProtocolManager
func (d *downloaderPeerNotify) registerPeer(p *peer) { type peerConnection struct {
pm := (*ProtocolManager)(d) manager *ProtocolManager
peer *peer
}
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { func (pc *peerConnection) Head() (common.Hash, *big.Int) {
return pc.peer.HeadAndTd()
}
func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := genReqID() reqID := genReqID()
rq := &distReq{ rq := &distReq{
getCost: func(dp distPeer) uint64 { getCost: func(dp distPeer) uint64 {
...@@ -851,7 +855,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { ...@@ -851,7 +855,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) {
return peer.GetRequestCost(GetBlockHeadersMsg, amount) return peer.GetRequestCost(GetBlockHeadersMsg, amount)
}, },
canSend: func(dp distPeer) bool { canSend: func(dp distPeer) bool {
return dp.(*peer) == p return dp.(*peer) == pc.peer
}, },
request: func(dp distPeer) func() { request: func(dp distPeer) func() {
peer := dp.(*peer) peer := dp.(*peer)
...@@ -860,13 +864,14 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { ...@@ -860,13 +864,14 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) {
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
}, },
} }
_, ok := <-pm.reqDist.queue(rq) _, ok := <-pc.manager.reqDist.queue(rq)
if !ok { if !ok {
return ErrNoPeers return ErrNoPeers
} }
return nil return nil
} }
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
reqID := genReqID() reqID := genReqID()
rq := &distReq{ rq := &distReq{
getCost: func(dp distPeer) uint64 { getCost: func(dp distPeer) uint64 {
...@@ -874,7 +879,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { ...@@ -874,7 +879,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) {
return peer.GetRequestCost(GetBlockHeadersMsg, amount) return peer.GetRequestCost(GetBlockHeadersMsg, amount)
}, },
canSend: func(dp distPeer) bool { canSend: func(dp distPeer) bool {
return dp.(*peer) == p return dp.(*peer) == pc.peer
}, },
request: func(dp distPeer) func() { request: func(dp distPeer) func() {
peer := dp.(*peer) peer := dp.(*peer)
...@@ -883,14 +888,20 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { ...@@ -883,14 +888,20 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) {
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
}, },
} }
_, ok := <-pm.reqDist.queue(rq) _, ok := <-pc.manager.reqDist.queue(rq)
if !ok { if !ok {
return ErrNoPeers return ErrNoPeers
} }
return nil return nil
} }
pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil) func (d *downloaderPeerNotify) registerPeer(p *peer) {
pm := (*ProtocolManager)(d)
pc := &peerConnection{
manager: pm,
peer: p,
}
pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
} }
func (d *downloaderPeerNotify) unregisterPeer(p *peer) { func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment