Unverified Commit 10592218 authored by Martin Holst Swende's avatar Martin Holst Swende Committed by GitHub

eth/downloader: refactor downloader + queue (#21263)

* eth/downloader: refactor downloader + queue

downloader, fetcher: throttle-metrics, fetcher filter improvements, standalone resultcache

downloader: more accurate deliverytime calculation, less mem overhead in state requests

downloader/queue: increase underlying buffer of results, new throttle mechanism

eth/downloader: updates to tests

eth/downloader: fix up some review concerns

eth/downloader/queue: minor fixes

eth/downloader: minor fixes after review call

eth/downloader: testcases for queue.go

eth/downloader: minor change, don't set progress unless progress...

eth/downloader: fix flaw which prevented useless peers from being dropped

eth/downloader: try to fix tests

eth/downloader: verify non-deliveries against advertised remote head

eth/downloader: fix flaw with checking closed-status causing hang

eth/downloader: hashing avoidance

eth/downloader: review concerns + simplify resultcache and queue

eth/downloader: add back some locks, address review concerns

downloader/queue: fix remaining lock flaw

* eth/downloader: nitpick fixes

* eth/downloader: remove the *2*3/4 throttling threshold dance

* eth/downloader: print correct throttle threshold in stats
Co-authored-by: 's avatarPéter Szilágyi <peterke@gmail.com>
parent 3a57eecc
......@@ -147,6 +147,17 @@ func rlpHash(x interface{}) (h common.Hash) {
return h
}
// EmptyBody returns true if there is no additional 'body' to complete the header
// that is: no transactions and no uncles.
func (h *Header) EmptyBody() bool {
return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
}
// EmptyReceipts returns true if there are no receipts for this header/block.
func (h *Header) EmptyReceipts() bool {
return h.ReceiptHash == EmptyRootHash
}
// Body is a simple (mutable, non-safe) data container for storing and moving
// a block's data contents (transactions and uncles) together.
type Body struct {
......
This diff is collapsed.
......@@ -297,14 +297,13 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
}
if _, ok := dl.ownHeaders[block.Hash()]; !ok {
if hdr := dl.getHeaderByHash(block.Hash()); hdr == nil {
dl.ownHashes = append(dl.ownHashes, block.Hash())
dl.ownHeaders[block.Hash()] = block.Header()
}
dl.ownBlocks[block.Hash()] = block
dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
td := dl.getTd(block.ParentHash())
dl.ownChainTd[block.Hash()] = new(big.Int).Add(td, block.Difficulty())
}
......@@ -538,7 +537,6 @@ func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a long block chain to download and the tester
targetBlocks := testChainBase.len() - 1
......@@ -570,31 +568,32 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
time.Sleep(25 * time.Millisecond)
tester.lock.Lock()
tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
cached = receipts
}
{
tester.downloader.queue.resultCache.lock.Lock()
cached = tester.downloader.queue.resultCache.countCompleted()
tester.downloader.queue.resultCache.lock.Unlock()
frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks)
}
frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks)
tester.downloader.queue.lock.Unlock()
tester.lock.Unlock()
if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
if cached == blockCacheItems ||
cached == blockCacheItems-reorgProtHeaderDelay ||
retrieved+cached+frozen == targetBlocks+1 ||
retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
break
}
}
// Make sure we filled up the cache, then exhaust it
time.Sleep(25 * time.Millisecond) // give it a chance to screw up
tester.lock.RLock()
retrieved = len(tester.ownBlocks)
tester.lock.RUnlock()
if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
}
// Permit the blocked blocks to import
if atomic.LoadUint32(&blocked) > 0 {
atomic.StoreUint32(&blocked, uint32(0))
......@@ -606,6 +605,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
if err := <-errc; err != nil {
t.Fatalf("block synchronization failed: %v", err)
}
tester.terminate()
}
// Tests that simple synchronization against a forked chain works correctly. In
......@@ -628,7 +629,6 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
tester.newPeer("fork A", protocol, chainA)
tester.newPeer("fork B", protocol, chainB)
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("fork A", nil, mode); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
......@@ -720,15 +720,12 @@ func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSyn
func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a long enough forked chain
chainA := testChainForkLightA
chainB := testChainForkHeavy
tester.newPeer("original", protocol, chainA)
tester.newPeer("heavy-rewriter", protocol, chainB)
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("original", nil, mode); err != nil {
......@@ -736,10 +733,12 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
}
assertOwnChain(t, tester, chainA.len())
tester.newPeer("heavy-rewriter", protocol, chainB)
// Synchronise with the second peer and ensure that the fork is rejected to being too old
if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
}
tester.terminate()
}
// Tests that an inactive downloader will not accept incoming block headers and
......@@ -1007,7 +1006,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
// Create a small enough block chain to download
targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
......@@ -1087,6 +1085,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
}
}
tester.terminate()
}
// Tests that a peer advertising a high TD doesn't get to stall the downloader
......@@ -1102,13 +1101,13 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()
tester := newTester()
defer tester.terminate()
chain := testChainBase.shorten(1)
tester.newPeer("attack", protocol, chain)
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
tester.terminate()
}
// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
......
......@@ -40,4 +40,6 @@ var (
stateInMeter = metrics.NewRegisteredMeter("eth/downloader/states/in", nil)
stateDropMeter = metrics.NewRegisteredMeter("eth/downloader/states/drop", nil)
throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil)
)
......@@ -117,9 +117,7 @@ func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *pe
return &peerConnection{
id: id,
lacking: make(map[common.Hash]struct{}),
peer: peer,
peer: peer,
version: version,
log: logger,
}
......@@ -173,12 +171,14 @@ func (p *peerConnection) FetchBodies(request *fetchRequest) error {
}
p.blockStarted = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
go p.peer.RequestBodies(hashes)
go func() {
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
p.peer.RequestBodies(hashes)
}()
return nil
}
......@@ -195,12 +195,14 @@ func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
}
p.receiptStarted = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
go p.peer.RequestReceipts(hashes)
go func() {
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
p.peer.RequestReceipts(hashes)
}()
return nil
}
......@@ -225,34 +227,34 @@ func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
// requests. Its estimated header retrieval throughput is updated with that measured
// just now.
func (p *peerConnection) SetHeadersIdle(delivered int) {
p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) {
p.setIdle(deliveryTime.Sub(p.headerStarted), delivered, &p.headerThroughput, &p.headerIdle)
}
// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
// requests. Its estimated body retrieval throughput is updated with that measured
// just now.
func (p *peerConnection) SetBodiesIdle(delivered int) {
p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) {
p.setIdle(deliveryTime.Sub(p.blockStarted), delivered, &p.blockThroughput, &p.blockIdle)
}
// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
// retrieval requests. Its estimated receipt retrieval throughput is updated
// with that measured just now.
func (p *peerConnection) SetReceiptsIdle(delivered int) {
p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) {
p.setIdle(deliveryTime.Sub(p.receiptStarted), delivered, &p.receiptThroughput, &p.receiptIdle)
}
// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
// data retrieval requests. Its estimated state retrieval throughput is updated
// with that measured just now.
func (p *peerConnection) SetNodeDataIdle(delivered int) {
p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) {
p.setIdle(deliveryTime.Sub(p.stateStarted), delivered, &p.stateThroughput, &p.stateIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its estimated retrieval throughput is updated with that measured just now.
func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
func (p *peerConnection) setIdle(elapsed time.Duration, delivered int, throughput *float64, idle *int32) {
// Irrelevant of the scaling, make sure the peer ends up idle
defer atomic.StoreInt32(idle, 0)
......@@ -265,7 +267,9 @@ func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *f
return
}
// Otherwise update the throughput with a new measurement
elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
if elapsed <= 0 {
elapsed = 1 // +1 (ns) to ensure non-zero divisor
}
measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
......@@ -523,22 +527,20 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerC
defer ps.lock.RUnlock()
idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
tps := make([]float64, 0, len(ps.peers))
for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) {
idle = append(idle, p)
tps = append(tps, throughput(p))
}
total++
}
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
if throughput(idle[i]) < throughput(idle[j]) {
idle[i], idle[j] = idle[j], idle[i]
}
}
}
return idle, total
// And sort them
sortPeers := &peerThroughputSort{idle, tps}
sort.Sort(sortPeers)
return sortPeers.p, total
}
// medianRTT returns the median RTT of the peerset, considering only the tuning
......@@ -571,3 +573,24 @@ func (ps *peerSet) medianRTT() time.Duration {
}
return median
}
// peerThroughputSort implements the Sort interface, and allows for
// sorting a set of peers by their throughput
// The sorted data is with the _highest_ throughput first
type peerThroughputSort struct {
p []*peerConnection
tp []float64
}
func (ps *peerThroughputSort) Len() int {
return len(ps.p)
}
func (ps *peerThroughputSort) Less(i, j int) bool {
return ps.tp[i] > ps.tp[j]
}
func (ps *peerThroughputSort) Swap(i, j int) {
ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
}
// Copyright 2020 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"sort"
"testing"
)
func TestPeerThroughputSorting(t *testing.T) {
a := &peerConnection{
id: "a",
headerThroughput: 1.25,
}
b := &peerConnection{
id: "b",
headerThroughput: 1.21,
}
c := &peerConnection{
id: "c",
headerThroughput: 1.23,
}
peers := []*peerConnection{a, b, c}
tps := []float64{a.headerThroughput,
b.headerThroughput, c.headerThroughput}
sortPeers := &peerThroughputSort{peers, tps}
sort.Sort(sortPeers)
if got, exp := sortPeers.p[0].id, "a"; got != exp {
t.Errorf("sort fail, got %v exp %v", got, exp)
}
if got, exp := sortPeers.p[1].id, "c"; got != exp {
t.Errorf("sort fail, got %v exp %v", got, exp)
}
if got, exp := sortPeers.p[2].id, "b"; got != exp {
t.Errorf("sort fail, got %v exp %v", got, exp)
}
}
This diff is collapsed.
This diff is collapsed.
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/core/types"
)
// resultStore implements a structure for maintaining fetchResults, tracking their
// download-progress and delivering (finished) results.
type resultStore struct {
items []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
// Internal index of first non-completed entry, updated atomically when needed.
// If all items are complete, this will equal length(items), so
// *important* : is not safe to use for indexing without checking against length
indexIncomplete int32 // atomic access
// throttleThreshold is the limit up to which we _want_ to fill the
// results. If blocks are large, we want to limit the results to less
// than the number of available slots, and maybe only fill 1024 out of
// 8192 possible places. The queue will, at certain times, recalibrate
// this index.
throttleThreshold uint64
lock sync.RWMutex
}
func newResultStore(size int) *resultStore {
return &resultStore{
resultOffset: 0,
items: make([]*fetchResult, size),
throttleThreshold: uint64(size),
}
}
// SetThrottleThreshold updates the throttling threshold based on the requested
// limit and the total queue capacity. It returns the (possibly capped) threshold
func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
r.lock.Lock()
defer r.lock.Unlock()
limit := uint64(len(r.items))
if threshold >= limit {
threshold = limit
}
r.throttleThreshold = threshold
return r.throttleThreshold
}
// AddFetch adds a header for body/receipt fetching. This is used when the queue
// wants to reserve headers for fetching.
//
// It returns the following:
// stale - if true, this item is already passed, and should not be requested again
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()
var index int
item, index, stale, throttled, err = r.getFetchResult(header.Number.Uint64())
if err != nil || stale || throttled {
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, fastSync)
r.items[index] = item
}
return stale, throttled, item, err
}
// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag
// is true, that means the header has already been delivered 'upstream'. This method
// does not bubble up the 'throttle' flag, since it's moot at the point in time when
// the item is downloaded and ready for delivery
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) {
r.lock.RLock()
defer r.lock.RUnlock()
res, _, stale, _, err := r.getFetchResult(headerNumber)
return res, stale, err
}
// getFetchResult returns the fetchResult corresponding to the given item, and
// the index where the result is stored.
func (r *resultStore) getFetchResult(headerNumber uint64) (item *fetchResult, index int, stale, throttle bool, err error) {
index = int(int64(headerNumber) - int64(r.resultOffset))
throttle = index >= int(r.throttleThreshold)
stale = index < 0
if index >= len(r.items) {
err = fmt.Errorf("%w: index allocation went beyond available resultStore space "+
"(index [%d] = header [%d] - resultOffset [%d], len(resultStore) = %d", errInvalidChain,
index, headerNumber, r.resultOffset, len(r.items))
return nil, index, stale, throttle, err
}
if stale {
return nil, index, stale, throttle, nil
}
item = r.items[index]
return item, index, stale, throttle, nil
}
// hasCompletedItems returns true if there are processable items available
// this method is cheaper than countCompleted
func (r *resultStore) HasCompletedItems() bool {
r.lock.RLock()
defer r.lock.RUnlock()
if len(r.items) == 0 {
return false
}
if item := r.items[0]; item != nil && item.AllDone() {
return true
}
return false
}
// countCompleted returns the number of items ready for delivery, stopping at
// the first non-complete item.
//
// The mthod assumes (at least) rlock is held.
func (r *resultStore) countCompleted() int {
// We iterate from the already known complete point, and see
// if any more has completed since last count
index := atomic.LoadInt32(&r.indexIncomplete)
for ; ; index++ {
if index >= int32(len(r.items)) {
break
}
result := r.items[index]
if result == nil || !result.AllDone() {
break
}
}
atomic.StoreInt32(&r.indexIncomplete, index)
return int(index)
}
// GetCompleted returns the next batch of completed fetchResults
func (r *resultStore) GetCompleted(limit int) []*fetchResult {
r.lock.Lock()
defer r.lock.Unlock()
completed := r.countCompleted()
if limit > completed {
limit = completed
}
results := make([]*fetchResult, limit)
copy(results, r.items[:limit])
// Delete the results from the cache and clear the tail.
copy(r.items, r.items[limit:])
for i := len(r.items) - limit; i < len(r.items); i++ {
r.items[i] = nil
}
// Advance the expected block number of the first cache entry
r.resultOffset += uint64(limit)
atomic.AddInt32(&r.indexIncomplete, int32(-limit))
return results
}
// Prepare initialises the offset with the given block number
func (r *resultStore) Prepare(offset uint64) {
r.lock.Lock()
defer r.lock.Unlock()
if r.resultOffset < offset {
r.resultOffset = offset
}
}
......@@ -34,7 +34,7 @@ import (
// stateReq represents a batch of state fetch requests grouped together into
// a single data retrieval network packet.
type stateReq struct {
items []common.Hash // Hashes of the state items to download
nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient)
tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires
......@@ -99,7 +99,6 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
// Run the state sync.
log.Trace("State sync starting", "root", s.root)
go s.run()
......@@ -235,16 +234,16 @@ func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*
if req == nil {
continue
}
req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason)
req.peer.log.Trace("State peer marked idle (spindown)", "req.items", int(req.nItems), "reason", reason)
req.timer.Stop()
delete(active, req.peer.id)
req.peer.SetNodeDataIdle(len(req.items))
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
// The 'finished' set contains deliveries that we were going to pass to processing.
// Those are now moot, but we still need to set those peers as idle, which would
// otherwise have been done after processing
for _, req := range finished {
req.peer.SetNodeDataIdle(len(req.items))
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}
......@@ -350,9 +349,10 @@ func (s *stateSync) loop() (err error) {
return errCanceled
case req := <-s.deliver:
deliveryTime := time.Now()
// Response, disconnect or timeout triggered, drop the peer if stalling
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
if len(req.items) <= 2 && !req.dropped && req.timedOut() {
if req.nItems <= 2 && !req.dropped && req.timedOut() {
// 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment.
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
......@@ -376,7 +376,7 @@ func (s *stateSync) loop() (err error) {
}
// Process all the received blobs and check for stale delivery
delivered, err := s.process(req)
req.peer.SetNodeDataIdle(delivered)
req.peer.SetNodeDataIdle(delivered, deliveryTime)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
......@@ -413,14 +413,14 @@ func (s *stateSync) assignTasks() {
// Assign a batch of fetches proportional to the estimated latency/bandwidth
cap := p.NodeDataCapacity(s.d.requestRTT())
req := &stateReq{peer: p, timeout: s.d.requestTTL()}
s.fillTasks(cap, req)
items := s.fillTasks(cap, req)
// If the peer was assigned tasks to fetch, send the network request
if len(req.items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root)
if len(items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(items), "root", s.root)
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
req.peer.FetchNodeData(items)
case <-s.cancel:
case <-s.d.cancelCh:
}
......@@ -430,7 +430,7 @@ func (s *stateSync) assignTasks() {
// fillTasks fills the given request object with a maximum of n state download
// tasks to send to the remote peer.
func (s *stateSync) fillTasks(n int, req *stateReq) {
func (s *stateSync) fillTasks(n int, req *stateReq) []common.Hash {
// Refill available tasks from the scheduler.
if len(s.tasks) < n {
new := s.sched.Missing(n - len(s.tasks))
......@@ -439,11 +439,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
}
}
// Find tasks that haven't been tried with the request's peer.
req.items = make([]common.Hash, 0, n)
items := make([]common.Hash, 0, n)
req.tasks = make(map[common.Hash]*stateTask, n)
for hash, t := range s.tasks {
// Stop when we've gathered enough requests
if len(req.items) == n {
if len(items) == n {
break
}
// Skip any requests we've already tried from this peer
......@@ -452,10 +452,12 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
}
// Assign the request to this peer
t.attempts[req.peer.id] = struct{}{}
req.items = append(req.items, hash)
items = append(items, hash)
req.tasks[hash] = t
delete(s.tasks, hash)
}
req.nItems = uint16(len(items))
return items
}
// process iterates over a batch of delivered state data, injecting each item
......
......@@ -538,40 +538,51 @@ func (f *BlockFetcher) loop() {
return
}
bodyFilterInMeter.Mark(int64(len(task.transactions)))
blocks := []*types.Block{}
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
// Match up a body to any possible completion request
matched := false
for hash, announce := range f.completing {
if f.queued[hash] == nil {
txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
uncleHash := types.CalcUncleHash(task.uncles[i])
if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
// Mark the body matched, reassemble if still unknown
matched = true
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
}
// abort early if there's nothing explicitly requested
if len(f.completing) > 0 {
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
// Match up a body to any possible completion request
var (
matched = false
uncleHash common.Hash // calculated lazily and reused
txnHash common.Hash // calculated lazily and reused
)
for hash, announce := range f.completing {
if f.queued[hash] != nil || announce.origin != task.peer {
continue
}
if uncleHash == (common.Hash{}) {
uncleHash = types.CalcUncleHash(task.uncles[i])
}
if uncleHash != announce.header.UncleHash {
continue
}
if txnHash == (common.Hash{}) {
txnHash = types.DeriveSha(types.Transactions(task.transactions[i]))
}
if txnHash != announce.header.TxHash {
continue
}
// Mark the body matched, reassemble if still unknown
matched = true
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
}
}
if matched {
task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
i--
continue
}
}
if matched {
task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
i--
continue
}
}
bodyFilterOutMeter.Mark(int64(len(task.transactions)))
select {
case filter <- task:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment