Unverified Commit 3010f9fc authored by Martin Holst Swende's avatar Martin Holst Swende Committed by GitHub

eth/downloader: change intial download size (#21366)

This changes how the downloader works, a little bit. Previously, when block sync started,
we immediately started filling up to 8192 blocks. Usually this is fine, blocks are small
in the early numbers. The threshold then is lowered as we measure the size of the blocks
that are filled.

However, if the node is shut down and restarts syncing while we're in a heavy segment,
that might be bad. This PR introduces a more conservative initial threshold of 2K blocks
instead.
parent d90bbce9
...@@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, ...@@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
stateBloom: stateBloom, stateBloom: stateBloom,
mux: mux, mux: mux,
checkpoint: checkpoint, checkpoint: checkpoint,
queue: newQueue(blockCacheItems), queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(), peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate), rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000), rttConfidence: uint64(1000000),
...@@ -379,7 +379,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode ...@@ -379,7 +379,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.stateBloom.Close() d.stateBloom.Close()
} }
// Reset the queue, peer set and wake channels to clean any internal leftover state // Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheItems) d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset() d.peers.Reset()
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
......
...@@ -39,7 +39,7 @@ import ( ...@@ -39,7 +39,7 @@ import (
func init() { func init() {
fullMaxForkAncestry = 10000 fullMaxForkAncestry = 10000
lightMaxForkAncestry = 10000 lightMaxForkAncestry = 10000
blockCacheItems = 1024 blockCacheMaxItems = 1024
fsHeaderContCheck = 500 * time.Millisecond fsHeaderContCheck = 500 * time.Millisecond
} }
...@@ -544,7 +544,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { ...@@ -544,7 +544,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download // Create a small enough block chain to download
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
tester.newPeer("peer", protocol, chain) tester.newPeer("peer", protocol, chain)
// Synchronise with the peer and make sure all relevant data was retrieved // Synchronise with the peer and make sure all relevant data was retrieved
...@@ -607,8 +607,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { ...@@ -607,8 +607,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
} }
tester.lock.Unlock() tester.lock.Unlock()
if cached == blockCacheItems || if cached == blockCacheMaxItems ||
cached == blockCacheItems-reorgProtHeaderDelay || cached == blockCacheMaxItems-reorgProtHeaderDelay ||
retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1 ||
retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
break break
...@@ -619,8 +619,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { ...@@ -619,8 +619,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
tester.lock.RLock() tester.lock.RLock()
retrieved = len(tester.ownBlocks) retrieved = len(tester.ownBlocks)
tester.lock.RUnlock() tester.lock.RUnlock()
if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
} }
// Permit the blocked blocks to import // Permit the blocked blocks to import
...@@ -873,7 +873,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { ...@@ -873,7 +873,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate() defer tester.terminate()
// Create a small enough block chain to download // Create a small enough block chain to download
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
// Create peers of every type // Create peers of every type
tester.newPeer("peer 63", 63, chain) tester.newPeer("peer 63", 63, chain)
...@@ -965,7 +965,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { ...@@ -965,7 +965,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
brokenChain := chain.shorten(chain.len()) brokenChain := chain.shorten(chain.len())
delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2]) delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
tester.newPeer("attack", protocol, brokenChain) tester.newPeer("attack", protocol, brokenChain)
...@@ -997,7 +997,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { ...@@ -997,7 +997,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
// Attempt a full sync with an attacker feeding shifted headers // Attempt a full sync with an attacker feeding shifted headers
brokenChain := chain.shorten(chain.len()) brokenChain := chain.shorten(chain.len())
...@@ -1202,7 +1202,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { ...@@ -1202,7 +1202,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
...@@ -1362,7 +1362,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { ...@@ -1362,7 +1362,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
...@@ -1435,7 +1435,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { ...@@ -1435,7 +1435,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15) chain := testChainBase.shorten(blockCacheMaxItems - 15)
// Set a sync init hook to catch progress changes // Set a sync init hook to catch progress changes
starting := make(chan struct{}) starting := make(chan struct{})
......
...@@ -40,9 +40,10 @@ const ( ...@@ -40,9 +40,10 @@ const (
) )
var ( var (
blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
) )
var ( var (
...@@ -142,7 +143,7 @@ type queue struct { ...@@ -142,7 +143,7 @@ type queue struct {
} }
// newQueue creates a new download queue for scheduling block retrieval. // newQueue creates a new download queue for scheduling block retrieval.
func newQueue(blockCacheLimit int) *queue { func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
lock := new(sync.RWMutex) lock := new(sync.RWMutex)
q := &queue{ q := &queue{
headerContCh: make(chan bool), headerContCh: make(chan bool),
...@@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue { ...@@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue {
active: sync.NewCond(lock), active: sync.NewCond(lock),
lock: lock, lock: lock,
} }
q.Reset(blockCacheLimit) q.Reset(blockCacheLimit, thresholdInitialSize)
return q return q
} }
// Reset clears out the queue contents. // Reset clears out the queue contents.
func (q *queue) Reset(blockCacheLimit int) { func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) { ...@@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) {
q.receiptPendPool = make(map[string]*fetchRequest) q.receiptPendPool = make(map[string]*fetchRequest)
q.resultCache = newResultStore(blockCacheLimit) q.resultCache = newResultStore(blockCacheLimit)
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
} }
// Close marks the end of the sync, unblocking Results. // Close marks the end of the sync, unblocking Results.
......
...@@ -97,7 +97,7 @@ func dummyPeer(id string) *peerConnection { ...@@ -97,7 +97,7 @@ func dummyPeer(id string) *peerConnection {
} }
func TestBasics(t *testing.T) { func TestBasics(t *testing.T) {
q := newQueue(10) q := newQueue(10, 10)
if !q.Idle() { if !q.Idle() {
t.Errorf("new queue should be idle") t.Errorf("new queue should be idle")
} }
...@@ -174,7 +174,7 @@ func TestBasics(t *testing.T) { ...@@ -174,7 +174,7 @@ func TestBasics(t *testing.T) {
} }
func TestEmptyBlocks(t *testing.T) { func TestEmptyBlocks(t *testing.T) {
q := newQueue(10) q := newQueue(10, 10)
q.Prepare(1, FastSync) q.Prepare(1, FastSync)
// Schedule a batch of headers // Schedule a batch of headers
...@@ -244,7 +244,7 @@ func XTestDelivery(t *testing.T) { ...@@ -244,7 +244,7 @@ func XTestDelivery(t *testing.T) {
log.Root().SetHandler(log.StdoutHandler) log.Root().SetHandler(log.StdoutHandler)
} }
q := newQueue(10) q := newQueue(10, 10)
var wg sync.WaitGroup var wg sync.WaitGroup
q.Prepare(1, FastSync) q.Prepare(1, FastSync)
wg.Add(1) wg.Add(1)
......
...@@ -39,7 +39,7 @@ var ( ...@@ -39,7 +39,7 @@ var (
) )
// The common prefix of all test chains: // The common prefix of all test chains:
var testChainBase = newTestChain(blockCacheItems+200, testGenesis) var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis)
// Different forks on top of the base chain: // Different forks on top of the base chain:
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment