Unverified Commit 25bd17d7 authored by rjl493456442's avatar rjl493456442 Committed by GitHub

core/state/snapshot: clean up the generation code (#24479)

parent 33022c2e
...@@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix ...@@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error type onStateCallback func(key []byte, val []byte, write bool, delete bool) error
// generateRange generates the state segment with particular prefix. Generation can // generateRange generates the state segment with particular prefix. Generation can
// either verify the correctness of existing state through rangeproof and skip // either verify the correctness of existing state through range-proof and skip
// generation, or iterate trie to regenerate state on demand. // generation, or iterate trie to regenerate state on demand.
func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) { func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
// Use range prover to check the validity of the flat state in the range // Use range prover to check the validity of the flat state in the range
...@@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, ...@@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
return !trieMore && !result.diskMore, last, nil return !trieMore && !result.diskMore, last, nil
} }
// generate is a background thread that iterates over the state and storage tries, // checkAndFlush checks if an interruption signal is received or the
// constructing the state snapshot. All the arguments are purely for statistics // batch size has exceeded the allowance.
// gathering and logging, since the method surfs the blocks as they arrive, often func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
// being restarted. var abort chan *generatorStats
func (dl *diskLayer) generate(stats *generatorStats) { select {
var ( case abort = <-dl.genAbort:
accMarker []byte default:
accountRange = accountCheckRange
)
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
// Always reset the initial account range as 1
// whenever recover from the interruption.
accMarker, accountRange = dl.genMarker[:common.HashLength], 1
} }
var ( if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
batch = dl.diskdb.NewBatch() if bytes.Compare(current, dl.genMarker) < 0 {
logged = time.Now() log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
accOrigin = common.CopyBytes(accMarker) }
abort chan *generatorStats // Flush out the batch anyway no matter it's empty or not.
) // It's possible that all the states are recovered and the
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker) // generation indeed makes progress.
journalProgress(batch, current, stats)
checkAndFlush := func(currentLocation []byte) error { if err := batch.Write(); err != nil {
select { return err
case abort = <-dl.genAbort:
default:
} }
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { batch.Reset()
if bytes.Compare(currentLocation, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards",
"currentLocation", fmt.Sprintf("%x", currentLocation),
"genMarker", fmt.Sprintf("%x", dl.genMarker))
}
// Flush out the batch anyway no matter it's empty or not. dl.lock.Lock()
// It's possible that all the states are recovered and the dl.genMarker = current
// generation indeed makes progress. dl.lock.Unlock()
journalProgress(batch, currentLocation, stats)
if err := batch.Write(); err != nil { if abort != nil {
return err stats.Log("Aborting state snapshot generation", dl.root, current)
} return newAbortErr(abort) // bubble up an error for interruption
batch.Reset() }
}
if time.Since(*logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, current)
*logged = time.Now()
}
return nil
}
dl.lock.Lock() // generateStorages generates the missing storage slots of the specific contract.
dl.genMarker = currentLocation // It's supposed to restart the generation from the given origin position.
dl.lock.Unlock() func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())
if abort != nil { if delete {
stats.Log("Aborting state snapshot generation", dl.root, currentLocation) rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key))
return errors.New("aborted") snapWipedStorageMeter.Mark(1)
} return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
} }
if time.Since(logged) > 8*time.Second { stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.Log("Generating state snapshot", dl.root, currentLocation) stats.slots++
logged = time.Now()
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil {
return err
} }
return nil return nil
} }
// Loop for re-generating the missing storage slots.
var origin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire contract storage is generated
if exhausted {
break
}
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
}
return nil
}
// generateAccounts generates the missing snapshot accounts as well as their
// storage slots in the main trie. It's supposed to restart the generation
// from the given origin position.
func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
onAccount := func(key []byte, val []byte, write bool, delete bool) error { onAccount := func(key []byte, val []byte, write bool, delete bool) error {
var ( var (
start = time.Now() start = time.Now()
...@@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) { ...@@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
marker = dl.genMarker[:] marker = dl.genMarker[:]
} }
// If we've exceeded our batch allowance or termination was requested, flush to disk // If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(marker); err != nil { if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil {
return err return err
} }
// If the iterated account is the contract, create a further loop to // If the iterated account is the contract, create a further loop to
...@@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) { ...@@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:] storeMarker = dl.genMarker[common.HashLength:]
} }
onStorage := func(key []byte, val []byte, write bool, delete bool) error { if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil {
defer func(start time.Time) { return err
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())
if delete {
rawdb.DeleteStorageSnapshot(batch, accountHash, common.BytesToHash(key))
snapWipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(append(accountHash[:], key...)); err != nil {
return err
}
return nil
}
var storeOrigin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err
}
if exhausted {
break
}
if storeOrigin = increaseKey(last); storeOrigin == nil {
break // special case, the last is 0xffffffff...fff
}
} }
} }
// Some account processed, unmark the marker // Some account processed, unmark the marker
accMarker = nil accMarker = nil
return nil return nil
} }
// Always reset the initial account range as 1 whenever recover from the interruption.
// Global loop for regerating the entire state trie + all layered storage tries. var accountRange = accountCheckRange
if len(accMarker) > 0 {
accountRange = 1
}
// Global loop for re-generating the account snapshots + all layered storage snapshots.
origin := common.CopyBytes(accMarker)
for { for {
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP) exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP)
// The procedure it aborted, either by external signal or internal error
if err != nil { if err != nil {
if abort == nil { // aborted by internal error, wait the signal return err // The procedure it aborted, either by external signal or internal error.
abort = <-dl.genAbort
}
abort <- stats
return
} }
// Abort the procedure if the entire snapshot is generated // Abort the procedure if the entire snapshot is generated
if exhausted { if exhausted {
break break
} }
if accOrigin = increaseKey(last); accOrigin == nil { if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff break // special case, the last is 0xffffffff...fff
} }
accountRange = accountCheckRange accountRange = accountCheckRange
} }
return nil
}
// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var accMarker []byte
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]
}
var (
batch = dl.diskdb.NewBatch()
logged = time.Now()
abort chan *generatorStats
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
// Generate the snapshot accounts from the point where they left off.
if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil {
// Extract the received interruption signal if exists
if aerr, ok := err.(*abortErr); ok {
abort = aerr.abort
}
// Aborted by internal error, wait the signal
if abort == nil {
abort = <-dl.genAbort
}
abort <- stats
return
}
// Snapshot fully generated, set the marker to nil. // Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the // Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete. // generator anyway to mark the snapshot is complete.
...@@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) { ...@@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
} }
// increaseKey increase the input key by one bit. Return nil if the entire // increaseKey increase the input key by one bit. Return nil if the entire
// addition operation overflows, // addition operation overflows.
func increaseKey(key []byte) []byte { func increaseKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- { for i := len(key) - 1; i >= 0; i-- {
key[i]++ key[i]++
...@@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte { ...@@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte {
} }
return nil return nil
} }
// abortErr wraps an interruption signal received to represent the
// generation is aborted by external processes.
type abortErr struct {
abort chan *generatorStats
}
func newAbortErr(abort chan *generatorStats) error {
return &abortErr{abort: abort}
}
func (err *abortErr) Error() string {
return "aborted"
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment