Unverified Commit b63e3c37 authored by gary rong's avatar gary rong Committed by GitHub

core: improve snapshot journal recovery (#21594)

* core/state/snapshot: introduce snapshot journal version

* core: update the disk layer in an atomic way

* core: persist the disk layer generator periodically

* core/state/snapshot: improve logging

* core/state/snapshot: forcibly ensure the legacy snapshot is matched

* core/state/snapshot: add debug logs

* core, tests: fix tests and special recovery case

* core: polish

* core: add more blockchain tests for snapshot recovery

* core/state: fix comment

* core: add recovery flag for snapshot

* core: add restart after start-after-crash tests

* core/rawdb: fix imports

* core: fix tests

* core: remove log

* core/state/snapshot: fix snapshot

* core: avoid callbacks in SetHead

* core: fix setHead cornercase where the threshold root has state

* core: small docs for the test cases
Co-authored-by: 's avatarPéter Szilágyi <peterke@gmail.com>
parent 43c278cd
......@@ -207,9 +207,10 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format.
}
// NewBlockChain returns a fully initialised block chain using information
......@@ -281,9 +282,29 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash(), "snaproot", diskRoot)
snapDisk, err := bc.SetHeadBeyondRoot(head.NumberU64(), diskRoot)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
}
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
......@@ -339,7 +360,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool
head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer > head.NumberU64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, recover)
}
// Take ownership of this particular state
go bc.update()
......@@ -444,9 +476,25 @@ func (bc *BlockChain) loadLastState() error {
// was fast synced or full synced and in which state, the method will try to
// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
_, err := bc.SetHeadBeyondRoot(head, common.Hash{})
return err
}
// SetHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewiding with snapshots enabled to ensure that we go back further than
// persistent disk layer. Depending on whether the node was fast synced or full, and
// in which state, the method will try to delete minimal data from disk whilst
// retaining chain consistency.
//
// The method returns the block number where the requested root cap was found.
func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
// Track the block number of the requested root hash
var rootNumber uint64 // (no root == always 0)
// Retrieve the last pivot block to short circuit rollbacks beyond it and the
// current freezer limit to start nuking id underflown
pivot := rawdb.ReadLastPivotNumber(bc.db)
......@@ -462,8 +510,16 @@ func (bc *BlockChain) SetHead(head uint64) error {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
// Block exists, keep rewinding until we find one with state
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
// root hash
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)
for {
// If a root threshold was requested but not yet crossed, check
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
}
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
......@@ -474,8 +530,12 @@ func (bc *BlockChain) SetHead(head uint64) error {
newHeadBlock = bc.genesisBlock
}
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
if beyondRoot || newHeadBlock.NumberU64() == 0 {
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1) // Keep rewinding
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
......@@ -555,7 +615,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()
return bc.loadLastState()
return rootNumber, bc.loadLastState()
}
// FastSyncCommitHead sets the current head block to the one defined by the hash
......@@ -940,8 +1000,14 @@ func (bc *BlockChain) Stop() {
var snapBase common.Hash
if bc.snaps != nil {
var err error
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
if bc.writeLegacyJournal {
if snapBase, err = bc.snaps.LegacyJournal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
} else {
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
}
}
// Ensure the state of a recent block is also stored to disk before exiting.
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -17,6 +17,8 @@
package rawdb
import (
"encoding/binary"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
......@@ -118,3 +120,58 @@ func DeleteSnapshotJournal(db ethdb.KeyValueWriter) {
log.Crit("Failed to remove snapshot journal", "err", err)
}
}
// ReadSnapshotGenerator retrieves the serialized snapshot generator saved at
// the last shutdown.
func ReadSnapshotGenerator(db ethdb.KeyValueReader) []byte {
data, _ := db.Get(snapshotGeneratorKey)
return data
}
// WriteSnapshotGenerator stores the serialized snapshot generator to save at
// shutdown.
func WriteSnapshotGenerator(db ethdb.KeyValueWriter, generator []byte) {
if err := db.Put(snapshotGeneratorKey, generator); err != nil {
log.Crit("Failed to store snapshot generator", "err", err)
}
}
// DeleteSnapshotGenerator deletes the serialized snapshot generator saved at
// the last shutdown
func DeleteSnapshotGenerator(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotGeneratorKey); err != nil {
log.Crit("Failed to remove snapshot generator", "err", err)
}
}
// ReadSnapshotRecoveryNumber retrieves the block number of the last persisted
// snapshot layer.
func ReadSnapshotRecoveryNumber(db ethdb.KeyValueReader) *uint64 {
data, _ := db.Get(snapshotRecoveryKey)
if len(data) == 0 {
return nil
}
if len(data) != 8 {
return nil
}
number := binary.BigEndian.Uint64(data)
return &number
}
// WriteSnapshotRecoveryNumber stores the block number of the last persisted
// snapshot layer.
func WriteSnapshotRecoveryNumber(db ethdb.KeyValueWriter, number uint64) {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], number)
if err := db.Put(snapshotRecoveryKey, buf[:]); err != nil {
log.Crit("Failed to store snapshot recovery number", "err", err)
}
}
// DeleteSnapshotRecoveryNumber deletes the block number of the last persisted
// snapshot layer.
func DeleteSnapshotRecoveryNumber(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotRecoveryKey); err != nil {
log.Crit("Failed to remove snapshot recovery number", "err", err)
}
}
......@@ -51,6 +51,12 @@ var (
// snapshotJournalKey tracks the in-memory diff layers across restarts.
snapshotJournalKey = []byte("SnapshotJournal")
// snapshotGeneratorKey tracks the snapshot generation marker across restarts.
snapshotGeneratorKey = []byte("SnapshotGenerator")
// snapshotRecoveryKey tracks the snapshot recovery marker across restarts.
snapshotRecoveryKey = []byte("SnapshotRecovery")
// txIndexTailKey tracks the oldest block whose transactions have been indexed.
txIndexTailKey = []byte("TransactionIndexTail")
......
......@@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
)
// reverse reverses the contents of a byte slice. It's used to update random accs
......@@ -429,6 +430,81 @@ func TestDiskPartialMerge(t *testing.T) {
}
}
// Tests that when the bottom-most diff layer is merged into the disk
// layer whether the corresponding generator is persisted correctly.
func TestDiskGeneratorPersistence(t *testing.T) {
var (
accOne = randomHash()
accTwo = randomHash()
accOneSlotOne = randomHash()
accOneSlotTwo = randomHash()
accThree = randomHash()
accThreeSlot = randomHash()
baseRoot = randomHash()
diffRoot = randomHash()
diffTwoRoot = randomHash()
genMarker = append(randomHash().Bytes(), randomHash().Bytes()...)
)
// Testing scenario 1, the disk layer is still under the construction.
db := rawdb.NewMemoryDatabase()
rawdb.WriteAccountSnapshot(db, accOne, accOne[:])
rawdb.WriteStorageSnapshot(db, accOne, accOneSlotOne, accOneSlotOne[:])
rawdb.WriteStorageSnapshot(db, accOne, accOneSlotTwo, accOneSlotTwo[:])
rawdb.WriteSnapshotRoot(db, baseRoot)
// Create a disk layer based on all above updates
snaps := &Tree{
layers: map[common.Hash]snapshot{
baseRoot: &diskLayer{
diskdb: db,
cache: fastcache.New(500 * 1024),
root: baseRoot,
genMarker: genMarker,
},
},
}
// Modify or delete some accounts, flatten everything onto disk
if err := snaps.Update(diffRoot, baseRoot, nil, map[common.Hash][]byte{
accTwo: accTwo[:],
}, nil); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
if err := snaps.Cap(diffRoot, 0); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob := rawdb.ReadSnapshotGenerator(db)
var generator journalGenerator
if err := rlp.DecodeBytes(blob, &generator); err != nil {
t.Fatalf("Failed to decode snapshot generator %v", err)
}
if !bytes.Equal(generator.Marker, genMarker) {
t.Fatalf("Generator marker is not matched")
}
// Test senario 2, the disk layer is fully generated
// Modify or delete some accounts, flatten everything onto disk
if err := snaps.Update(diffTwoRoot, diffRoot, nil, map[common.Hash][]byte{
accThree: accThree.Bytes(),
}, map[common.Hash]map[common.Hash][]byte{
accThree: {accThreeSlot: accThreeSlot.Bytes()},
}); err != nil {
t.Fatalf("failed to update snapshot tree: %v", err)
}
diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer)
diskLayer.genMarker = nil // Construction finished
if err := snaps.Cap(diffTwoRoot, 0); err != nil {
t.Fatalf("failed to flatten snapshot tree: %v", err)
}
blob = rawdb.ReadSnapshotGenerator(db)
if err := rlp.DecodeBytes(blob, &generator); err != nil {
t.Fatalf("Failed to decode snapshot generator %v", err)
}
if len(generator.Marker) != 0 {
t.Fatalf("Failed to update snapshot generator")
}
}
// Tests that merging something into a disk layer persists it into the database
// and invalidates any previously written and cached values, discarding anything
// after the in-progress generation marker.
......
......@@ -112,6 +112,7 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i
genAbort: make(chan chan *generatorStats),
}
go base.generate(&generatorStats{wiping: wiper, start: time.Now()})
log.Debug("Start snapshot generation", "root", root)
return base
}
......
This diff is collapsed.
......@@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
......@@ -136,6 +137,10 @@ type snapshot interface {
// flattening everything down (bad for reorgs).
Journal(buffer *bytes.Buffer) (common.Hash, error)
// LegacyJournal is basically identical to Journal. it's the legacy version for
// flushing legacy journal. Now the only purpose of this function is for testing.
LegacyJournal(buffer *bytes.Buffer) (common.Hash, error)
// Stale return whether this layer has become stale (was flattened across) or
// if it's still live.
Stale() bool
......@@ -168,10 +173,12 @@ type Tree struct {
// store (with a number of memory layers from a journal), ensuring that the head
// of the snapshot matches the expected one.
//
// If the snapshot is missing or inconsistent, the entirety is deleted and will
// be reconstructed from scratch based on the tries in the key-value store, on a
// background thread.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool) *Tree {
// If the snapshot is missing or the disk layer is broken, the entire is deleted
// and will be reconstructed from scratch based on the tries in the key-value
// store, on a background thread. If the memory layers from the journal is not
// continuous with disk layer or the journal is missing, all diffs will be discarded
// iff it's in "recovery" mode, otherwise rebuild is mandatory.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, recovery bool) *Tree {
// Create a new, empty snapshot tree
snap := &Tree{
diskdb: diskdb,
......@@ -183,7 +190,7 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm
defer snap.waitBuild()
}
// Attempt to load a previously persisted snapshot and rebuild one if failed
head, err := loadSnapshot(diskdb, triedb, cache, root)
head, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
if err != nil {
log.Warn("Failed to load snapshot, regenerating", "err", err)
snap.Rebuild(root)
......@@ -198,7 +205,7 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root comm
}
// waitBuild blocks until the snapshot finishes rebuilding. This method is meant
// to be used by tests to ensure we're testing what we believe we are.
// to be used by tests to ensure we're testing what we believe we are.
func (t *Tree) waitBuild() {
// Find the rebuild termination channel
var done chan struct{}
......@@ -415,6 +422,9 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
//
// The disk layer persistence should be operated in an atomic way. All updates should
// be discarded if the whole transition if not finished.
func diffToDisk(bottom *diffLayer) *diskLayer {
var (
base = bottom.parent.(*diskLayer)
......@@ -427,8 +437,7 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
base.genAbort <- abort
stats = <-abort
}
// Start by temporarily deleting the current snapshot block marker. This
// ensures that in the case of a crash, the entire snapshot is invalidated.
// Put the deletion in the batch writer, flush all updates in the final step.
rawdb.DeleteSnapshotRoot(batch)
// Mark the original base as stale as we're going to create a new wrapper
......@@ -471,12 +480,6 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
base.cache.Set(hash[:], data)
snapshotCleanAccountWriteMeter.Mark(int64(len(data)))
if batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Crit("Failed to write account snapshot", "err", err)
}
batch.Reset()
}
snapshotFlushAccountItemMeter.Mark(1)
snapshotFlushAccountSizeMeter.Mark(int64(len(data)))
}
......@@ -505,18 +508,33 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
snapshotFlushStorageItemMeter.Mark(1)
snapshotFlushStorageSizeMeter.Mark(int64(len(data)))
}
if batch.ValueSize() > ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Crit("Failed to write storage snapshot", "err", err)
}
batch.Reset()
}
}
// Update the snapshot block marker and write any remainder data
rawdb.WriteSnapshotRoot(batch, bottom.root)
// Write out the generator marker
entry := journalGenerator{
Done: base.genMarker == nil,
Marker: base.genMarker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(fmt.Sprintf("Failed to RLP encode generator %v", err))
}
rawdb.WriteSnapshotGenerator(batch, blob)
// Flush all the updates in the single db operation. Ensure the
// disk layer transition is atomic.
if err := batch.Write(); err != nil {
log.Crit("Failed to write leftover snapshot", "err", err)
}
log.Debug("Journalled disk layer", "root", bottom.root, "complete", base.genMarker == nil)
res := &diskLayer{
root: bottom.root,
cache: base.cache,
......@@ -554,7 +572,21 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
t.lock.Lock()
defer t.lock.Unlock()
// Firstly write out the metadata of journal
journal := new(bytes.Buffer)
if err := rlp.Encode(journal, journalVersion); err != nil {
return common.Hash{}, err
}
diskroot := t.diskRoot()
if diskroot == (common.Hash{}) {
return common.Hash{}, errors.New("invalid disk root")
}
// Secondly write out the disk layer root, ensure the
// diff journal is continuous with disk.
if err := rlp.Encode(journal, diskroot); err != nil {
return common.Hash{}, err
}
// Finally write out the journal of each layer in reverse order.
base, err := snap.(snapshot).Journal(journal)
if err != nil {
return common.Hash{}, err
......@@ -564,6 +596,29 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
return base, nil
}
// LegacyJournal is basically identical to Journal. it's the legacy
// version for flushing legacy journal. Now the only purpose of this
// function is for testing.
func (t *Tree) LegacyJournal(root common.Hash) (common.Hash, error) {
// Retrieve the head snapshot to journal from var snap snapshot
snap := t.Snapshot(root)
if snap == nil {
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
}
// Run the journaling
t.lock.Lock()
defer t.lock.Unlock()
journal := new(bytes.Buffer)
base, err := snap.(snapshot).LegacyJournal(journal)
if err != nil {
return common.Hash{}, err
}
// Store the journal into the database and return
rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes())
return base, nil
}
// Rebuild wipes all available snapshot data from the persistent database and
// discard all caches and diff layers. Afterwards, it starts a new snapshot
// generator with the given root hash.
......@@ -571,6 +626,10 @@ func (t *Tree) Rebuild(root common.Hash) {
t.lock.Lock()
defer t.lock.Unlock()
// Firstly delete any recovery flag in the database. Because now we are
// building a brand new snapshot.
rawdb.DeleteSnapshotRecoveryNumber(t.diskdb)
// Track whether there's a wipe currently running and keep it alive if so
var wiper chan struct{}
......@@ -657,6 +716,16 @@ func (t *Tree) disklayer() *diskLayer {
}
}
// diskRoot is a internal helper function to return the disk layer root.
// The lock of snapTree is assumed to be held already.
func (t *Tree) diskRoot() common.Hash {
disklayer := t.disklayer()
if disklayer == nil {
return common.Hash{}
}
return disklayer.Root()
}
// generating is an internal helper function which reports whether the snapshot
// is still under the construction.
func (t *Tree) generating() (bool, error) {
......@@ -671,3 +740,11 @@ func (t *Tree) generating() (bool, error) {
defer layer.lock.RUnlock()
return layer.genMarker != nil, nil
}
// diskRoot is a external helper function to return the disk layer root.
func (t *Tree) DiskRoot() common.Hash {
t.lock.Lock()
defer t.lock.Unlock()
return t.diskRoot()
}
......@@ -235,7 +235,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo
var snaps *snapshot.Tree
if snapshotter {
snaps = snapshot.New(db, sdb.TrieDB(), 1, root, false)
snaps = snapshot.New(db, sdb.TrieDB(), 1, root, false, false)
}
statedb, _ = state.New(root, sdb, snaps)
return snaps, statedb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment