core/state, eth/downloader, trie: reset fast-failure on progress

parent b8dec948
...@@ -59,8 +59,10 @@ func (s *StateSync) Missing(max int) []common.Hash { ...@@ -59,8 +59,10 @@ func (s *StateSync) Missing(max int) []common.Hash {
return (*trie.TrieSync)(s).Missing(max) return (*trie.TrieSync)(s).Missing(max)
} }
// Process injects a batch of retrieved trie nodes data. // Process injects a batch of retrieved trie nodes data, returning if something
func (s *StateSync) Process(list []trie.SyncResult) (int, error) { // was committed to the database and also the index of an entry if processing of
// it failed.
func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) {
return (*trie.TrieSync)(s).Process(list) return (*trie.TrieSync)(s).Process(list)
} }
......
...@@ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) { ...@@ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) {
} }
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(batch)...) queue = append(queue[:0], sched.Missing(batch)...)
...@@ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { ...@@ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {
} }
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[len(results):], sched.Missing(0)...) queue = append(queue[len(results):], sched.Missing(0)...)
...@@ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { ...@@ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) {
results = append(results, trie.SyncResult{Hash: hash, Data: data}) results = append(results, trie.SyncResult{Hash: hash, Data: data})
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
...@@ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { ...@@ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {
} }
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, hash := range sched.Missing(0) { for _, hash := range sched.Missing(0) {
...@@ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) { ...@@ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) {
results[i] = trie.SyncResult{Hash: hash, Data: data} results[i] = trie.SyncResult{Hash: hash, Data: data}
} }
// Process each of the state nodes // Process each of the state nodes
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {
......
...@@ -64,12 +64,12 @@ var ( ...@@ -64,12 +64,12 @@ var (
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point fsPivotInterval = 512 // Number of headers out of which to randomize the pivot point
fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync fsMinFullBlocks = 1024 // Number of blocks to retrieve fully even in fast sync
fsCriticalTrials = 10 // Number of times to retry in the cricical section before bailing fsCriticalTrials = uint32(32) // Number of times to retry in the cricical section before bailing
) )
var ( var (
...@@ -105,7 +105,7 @@ type Downloader struct { ...@@ -105,7 +105,7 @@ type Downloader struct {
peers *peerSet // Set of active peers from which download can proceed peers *peerSet // Set of active peers from which download can proceed
fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries) fsPivotLock *types.Header // Pivot header on critical section entry (cannot change between retries)
fsPivotFails int // Number of fast sync failures in the critical section fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
rttEstimate uint64 // Round trip time to target for download requests rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
...@@ -361,7 +361,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode ...@@ -361,7 +361,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// Set the requested sync mode, unless it's forbidden // Set the requested sync mode, unless it's forbidden
d.mode = mode d.mode = mode
if d.mode == FastSync && d.fsPivotFails >= fsCriticalTrials { if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
d.mode = FullSync d.mode = FullSync
} }
// Retrieve the origin peer and initiate the downloading process // Retrieve the origin peer and initiate the downloading process
...@@ -926,7 +926,7 @@ func (d *Downloader) fetchNodeData() error { ...@@ -926,7 +926,7 @@ func (d *Downloader) fetchNodeData() error {
var ( var (
deliver = func(packet dataPack) (int, error) { deliver = func(packet dataPack) (int, error) {
start := time.Now() start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
// If the peer returned old-requested data, forgive // If the peer returned old-requested data, forgive
if err == trie.ErrNotRequested { if err == trie.ErrNotRequested {
glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId()) glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
...@@ -951,6 +951,11 @@ func (d *Downloader) fetchNodeData() error { ...@@ -951,6 +951,11 @@ func (d *Downloader) fetchNodeData() error {
syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
d.syncStatsLock.Unlock() d.syncStatsLock.Unlock()
// If real database progress was made, reset any fast-sync pivot failure
if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))
atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
}
// Log a message to the user and return // Log a message to the user and return
if delivered > 0 { if delivered > 0 {
glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending) glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)
...@@ -1177,7 +1182,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { ...@@ -1177,7 +1182,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// If we're already past the pivot point, this could be an attack, thread carefully // If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot { if rollback[len(rollback)-1].Number.Uint64() > pivot {
// If we didn't ever fail, lock in te pivot header (must! not! change!) // If we didn't ever fail, lock in te pivot header (must! not! change!)
if d.fsPivotFails == 0 { if atomic.LoadUint32(&d.fsPivotFails) == 0 {
for _, header := range rollback { for _, header := range rollback {
if header.Number.Uint64() == pivot { if header.Number.Uint64() == pivot {
glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4]) glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
...@@ -1185,7 +1190,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { ...@@ -1185,7 +1190,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
} }
} }
} }
d.fsPivotFails++ atomic.AddUint32(&d.fsPivotFails, 1)
} }
} }
}() }()
......
This diff is collapsed.
...@@ -1039,9 +1039,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ ...@@ -1039,9 +1039,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
} }
// DeliverNodeData injects a node state data retrieval response into the queue. // DeliverNodeData injects a node state data retrieval response into the queue.
// The method returns the number of node state entries originally requested, and // The method returns the number of node state accepted from the delivery.
// the number of them actually accepted from the delivery. func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, bool, error)) (int, error) {
func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -1099,31 +1098,34 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i ...@@ -1099,31 +1098,34 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch // deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler. // of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) {
// Wake up WaitResults after the state has been written because it // Wake up WaitResults after the state has been written because it
// might be waiting for the pivot block state to get completed. // might be waiting for the pivot block state to get completed.
defer q.active.Signal() defer q.active.Signal()
// Process results one by one to permit task fetches in between // Process results one by one to permit task fetches in between
progressed := false
for i, result := range results { for i, result := range results {
q.stateSchedLock.Lock() q.stateSchedLock.Lock()
if q.stateScheduler == nil { if q.stateScheduler == nil {
// Syncing aborted since this async delivery started, bail out // Syncing aborted since this async delivery started, bail out
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
callback(errNoFetchesPending, i) callback(i, progressed, errNoFetchesPending)
return return
} }
if _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil { if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil {
// Processing a state result failed, bail out // Processing a state result failed, bail out
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
callback(err, i) callback(i, progressed, err)
return return
} else if prog {
progressed = true
} }
// Item processing succeeded, release the lock (temporarily) // Item processing succeeded, release the lock (temporarily)
q.stateSchedLock.Unlock() q.stateSchedLock.Unlock()
} }
callback(nil, len(results)) callback(len(results), progressed, nil)
} }
// Prepare configures the result cache to allow accepting and caching inbound // Prepare configures the result cache to allow accepting and caching inbound
......
...@@ -142,34 +142,40 @@ func (s *TrieSync) Missing(max int) []common.Hash { ...@@ -142,34 +142,40 @@ func (s *TrieSync) Missing(max int) []common.Hash {
return requests return requests
} }
// Process injects a batch of retrieved trie nodes data. // Process injects a batch of retrieved trie nodes data, returning if something
func (s *TrieSync) Process(results []SyncResult) (int, error) { // was committed to the database and also the index of an entry if processing of
// it failed.
func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
committed := false
for i, item := range results { for i, item := range results {
// If the item was not requested, bail out // If the item was not requested, bail out
request := s.requests[item.Hash] request := s.requests[item.Hash]
if request == nil { if request == nil {
return i, ErrNotRequested return committed, i, ErrNotRequested
} }
// If the item is a raw entry request, commit directly // If the item is a raw entry request, commit directly
if request.raw { if request.raw {
request.data = item.Data request.data = item.Data
s.commit(request, nil) s.commit(request, nil)
committed = true
continue continue
} }
// Decode the node data content and update the request // Decode the node data content and update the request
node, err := decodeNode(item.Hash[:], item.Data, 0) node, err := decodeNode(item.Hash[:], item.Data, 0)
if err != nil { if err != nil {
return i, err return committed, i, err
} }
request.data = item.Data request.data = item.Data
// Create and schedule a request for all the children nodes // Create and schedule a request for all the children nodes
requests, err := s.children(request, node) requests, err := s.children(request, node)
if err != nil { if err != nil {
return i, err return committed, i, err
} }
if len(requests) == 0 && request.deps == 0 { if len(requests) == 0 && request.deps == 0 {
s.commit(request, nil) s.commit(request, nil)
committed = true
continue continue
} }
request.deps += len(requests) request.deps += len(requests)
...@@ -177,7 +183,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) { ...@@ -177,7 +183,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) {
s.schedule(child) s.schedule(child)
} }
} }
return 0, nil return committed, 0, nil
} }
// Pending returns the number of state entries currently pending for download. // Pending returns the number of state entries currently pending for download.
......
...@@ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) { ...@@ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) {
} }
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(batch)...) queue = append(queue[:0], sched.Missing(batch)...)
...@@ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) { ...@@ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) {
} }
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[len(results):], sched.Missing(10000)...) queue = append(queue[len(results):], sched.Missing(10000)...)
...@@ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { ...@@ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) {
results = append(results, SyncResult{hash, data}) results = append(results, SyncResult{hash, data})
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = make(map[common.Hash]struct{}) queue = make(map[common.Hash]struct{})
...@@ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { ...@@ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) {
} }
} }
// Feed the retrieved results back and queue new tasks // Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {
...@@ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { ...@@ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
queue = append(queue[:0], sched.Missing(0)...) queue = append(queue[:0], sched.Missing(0)...)
...@@ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) { ...@@ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) {
results[i] = SyncResult{hash, data} results[i] = SyncResult{hash, data}
} }
// Process each of the trie nodes // Process each of the trie nodes
if index, err := sched.Process(results); err != nil { if _, index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err) t.Fatalf("failed to process result #%d: %v", index, err)
} }
for _, result := range results { for _, result := range results {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment