Unverified Commit 5e9f5ca5 authored by gary rong's avatar gary rong Committed by GitHub

core/state/snapshot: write snapshot generator in batch (#22163)

* core/state/snapshot: write snapshot generator in batch

* core: refactor the tests

* core: update tests

* core: update tests
parent 10555d46
......@@ -28,27 +28,19 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
)
// snapshotTest is a test case for snapshot recovery. It can be used for
// simulating these scenarios:
// (i) Geth restarts normally with valid legacy snapshot
// (ii) Geth restarts normally with valid new-format snapshot
// (iii) Geth restarts after the crash, with broken legacy snapshot
// (iv) Geth restarts after the crash, with broken new-format snapshot
// (v) Geth restarts normally, but it's requested to be rewound to a lower point via SetHead
// (vi) Geth restarts normally with a stale snapshot
type snapshotTest struct {
legacy bool // Flag whether the loaded snapshot is in legacy format
crash bool // Flag whether the Geth restarts from the previous crash
restartCrash int // Number of blocks to insert after the normal stop, then the crash happens
gapped int // Number of blocks to insert without enabling snapshot
setHead uint64 // Block number to set head back to
// snapshotTestBasic wraps the common testing fields in the snapshot tests.
type snapshotTestBasic struct {
legacy bool // Wether write the snapshot journal in legacy format
chainBlocks int // Number of blocks to generate for the canonical chain
snapshotBlock uint64 // Block number of the relevant snapshot disk layer
commitBlock uint64 // Block number for which to commit the state to disk
......@@ -58,56 +50,418 @@ type snapshotTest struct {
expHeadFastBlock uint64 // Block number of the expected head fast sync block
expHeadBlock uint64 // Block number of the expected head full block
expSnapshotBottom uint64 // The block height corresponding to the snapshot disk layer
// share fields, set in runtime
datadir string
db ethdb.Database
gendb ethdb.Database
engine consensus.Engine
}
func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Block) {
// Create a temporary persistent database
datadir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("Failed to create temporary datadir: %v", err)
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
// Initialize a fresh chain
var (
genesis = new(Genesis).MustCommit(db)
engine = ethash.NewFullFaker()
gendb = rawdb.NewMemoryDatabase()
// Snapshot is enabled, the first snapshot is created from the Genesis.
// The snapshot memory allowance is 256MB, it means no snapshot flush
// will happen during the block insertion.
cacheConfig = defaultCacheConfig
)
chain, err := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, gendb, basic.chainBlocks, func(i int, b *BlockGen) {})
// Insert the blocks with configured settings.
var breakpoints []uint64
if basic.commitBlock > basic.snapshotBlock {
breakpoints = append(breakpoints, basic.snapshotBlock, basic.commitBlock)
} else {
breakpoints = append(breakpoints, basic.commitBlock, basic.snapshotBlock)
}
var startPoint uint64
for _, point := range breakpoints {
if _, err := chain.InsertChain(blocks[startPoint:point]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
startPoint = point
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.stateCache.TrieDB().Commit(blocks[point-1].Root(), true, nil)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.legacy {
// Here we commit the snapshot disk root to simulate
// committing the legacy snapshot.
rawdb.WriteSnapshotRoot(db, blocks[point-1].Root())
} else {
// Flushing the entire snap tree into the disk, the
// relavant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
chain.snaps.Cap(blocks[point-1].Root(), 0)
diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot)
}
}
}
}
if _, err := chain.InsertChain(blocks[startPoint:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
// Set runtime fields
basic.datadir = datadir
basic.db = db
basic.gendb = gendb
basic.engine = engine
// Ugly hack, notify the chain to flush the journal in legacy format
// if it's requested.
if basic.legacy {
chain.writeLegacyJournal = true
}
return chain, blocks
}
func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks []*types.Block) {
// Iterate over all the remaining blocks and ensure there are no gaps
verifyNoGaps(t, chain, true, blocks)
verifyCutoff(t, chain, true, blocks, basic.expCanonicalBlocks)
if head := chain.CurrentHeader(); head.Number.Uint64() != basic.expHeadHeader {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, basic.expHeadHeader)
}
if head := chain.CurrentFastBlock(); head.NumberU64() != basic.expHeadFastBlock {
t.Errorf("Head fast block mismatch: have %d, want %d", head.NumberU64(), basic.expHeadFastBlock)
}
if head := chain.CurrentBlock(); head.NumberU64() != basic.expHeadBlock {
t.Errorf("Head block mismatch: have %d, want %d", head.NumberU64(), basic.expHeadBlock)
}
// Check the disk layer, ensure they are matched
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The correspnding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
// Check the snapshot, ensure it's integrated
if err := snapshot.VerifyState(chain.snaps, block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
func (tt *snapshotTest) dump() string {
func (basic *snapshotTestBasic) dump() string {
buffer := new(strings.Builder)
fmt.Fprint(buffer, "Chain:\n G")
for i := 0; i < tt.chainBlocks; i++ {
for i := 0; i < basic.chainBlocks; i++ {
fmt.Fprintf(buffer, "->C%d", i+1)
}
fmt.Fprint(buffer, " (HEAD)\n\n")
fmt.Fprintf(buffer, "Commit: G")
if tt.commitBlock > 0 {
fmt.Fprintf(buffer, ", C%d", tt.commitBlock)
if basic.commitBlock > 0 {
fmt.Fprintf(buffer, ", C%d", basic.commitBlock)
}
fmt.Fprint(buffer, "\n")
fmt.Fprintf(buffer, "Snapshot: G")
if tt.snapshotBlock > 0 {
fmt.Fprintf(buffer, ", C%d", tt.snapshotBlock)
if basic.snapshotBlock > 0 {
fmt.Fprintf(buffer, ", C%d", basic.snapshotBlock)
}
fmt.Fprint(buffer, "\n")
if tt.crash {
fmt.Fprintf(buffer, "\nCRASH\n\n")
} else {
fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", tt.setHead)
}
//if crash {
// fmt.Fprintf(buffer, "\nCRASH\n\n")
//} else {
// fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", basic.setHead)
//}
fmt.Fprintf(buffer, "------------------------------\n\n")
fmt.Fprint(buffer, "Expected in leveldb:\n G")
for i := 0; i < tt.expCanonicalBlocks; i++ {
for i := 0; i < basic.expCanonicalBlocks; i++ {
fmt.Fprintf(buffer, "->C%d", i+1)
}
fmt.Fprintf(buffer, "\n\n")
fmt.Fprintf(buffer, "Expected head header : C%d\n", tt.expHeadHeader)
fmt.Fprintf(buffer, "Expected head fast block: C%d\n", tt.expHeadFastBlock)
if tt.expHeadBlock == 0 {
fmt.Fprintf(buffer, "Expected head header : C%d\n", basic.expHeadHeader)
fmt.Fprintf(buffer, "Expected head fast block: C%d\n", basic.expHeadFastBlock)
if basic.expHeadBlock == 0 {
fmt.Fprintf(buffer, "Expected head block : G\n")
} else {
fmt.Fprintf(buffer, "Expected head block : C%d\n", tt.expHeadBlock)
fmt.Fprintf(buffer, "Expected head block : C%d\n", basic.expHeadBlock)
}
if tt.expSnapshotBottom == 0 {
if basic.expSnapshotBottom == 0 {
fmt.Fprintf(buffer, "Expected snapshot disk : G\n")
} else {
fmt.Fprintf(buffer, "Expected snapshot disk : C%d\n", tt.expSnapshotBottom)
fmt.Fprintf(buffer, "Expected snapshot disk : C%d\n", basic.expSnapshotBottom)
}
return buffer.String()
}
func (basic *snapshotTestBasic) teardown() {
basic.db.Close()
basic.gendb.Close()
os.RemoveAll(basic.datadir)
}
// snapshotTest is a test case type for normal snapshot recovery.
// It can be used for testing that restart Geth normally.
type snapshotTest struct {
snapshotTestBasic
}
func (snaptest *snapshotTest) test(t *testing.T) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
chain, blocks := snaptest.prepare(t)
// Restart the chain normally
chain.Stop()
newchain, err := NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer newchain.Stop()
snaptest.verify(t, newchain, blocks)
}
// crashSnapshotTest is a test case type for innormal snapshot recovery.
// It can be used for testing that restart Geth after the crash.
type crashSnapshotTest struct {
snapshotTestBasic
}
func (snaptest *crashSnapshotTest) test(t *testing.T) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
chain, blocks := snaptest.prepare(t)
// Pull the plug on the database, simulating a hard crash
db := chain.db
db.Close()
// Start a new blockchain back up and see where the repair leads us
newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "")
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}
defer newdb.Close()
// The interesting thing is: instead of starting the blockchain after
// the crash, we do restart twice here: one after the crash and one
// after the normal stop. It's used to ensure the broken snapshot
// can be detected all the time.
newchain, err := NewBlockChain(newdb, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
newchain.Stop()
newchain, err = NewBlockChain(newdb, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer newchain.Stop()
snaptest.verify(t, newchain, blocks)
}
// gappedSnapshotTest is a test type used to test this scenario:
// - have a complete snapshot
// - restart without enabling the snapshot
// - insert a few blocks
// - restart with enabling the snapshot again
type gappedSnapshotTest struct {
snapshotTestBasic
gapped int // Number of blocks to insert without enabling snapshot
}
func (snaptest *gappedSnapshotTest) test(t *testing.T) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
chain, blocks := snaptest.prepare(t)
// Insert blocks without enabling snapshot if gapping is required.
chain.Stop()
gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], snaptest.engine, snaptest.gendb, snaptest.gapped, func(i int, b *BlockGen) {})
// Insert a few more blocks without enabling snapshot
var cacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 0,
}
newchain, err := NewBlockChain(snaptest.db, cacheConfig, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
newchain.InsertChain(gappedBlocks)
newchain.Stop()
// Restart the chain with enabling the snapshot
newchain, err = NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer newchain.Stop()
snaptest.verify(t, newchain, blocks)
}
// setHeadSnapshotTest is the test type used to test this scenario:
// - have a complete snapshot
// - set the head to a lower point
// - restart
type setHeadSnapshotTest struct {
snapshotTestBasic
setHead uint64 // Block number to set head back to
}
func (snaptest *setHeadSnapshotTest) test(t *testing.T) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
chain, blocks := snaptest.prepare(t)
// Rewind the chain if setHead operation is required.
chain.SetHead(snaptest.setHead)
chain.Stop()
newchain, err := NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer newchain.Stop()
snaptest.verify(t, newchain, blocks)
}
// restartCrashSnapshotTest is the test type used to test this scenario:
// - have a complete snapshot
// - restart chain
// - insert more blocks with enabling the snapshot
// - commit the snapshot
// - crash
// - restart again
type restartCrashSnapshotTest struct {
snapshotTestBasic
newBlocks int
}
func (snaptest *restartCrashSnapshotTest) test(t *testing.T) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
chain, blocks := snaptest.prepare(t)
// Firstly, stop the chain properly, with all snapshot journal
// and state committed.
chain.Stop()
newchain, err := NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], snaptest.engine, snaptest.gendb, snaptest.newBlocks, func(i int, b *BlockGen) {})
newchain.InsertChain(newBlocks)
// Commit the entire snapshot into the disk if requested. Note only
// (a) snapshot root and (b) snapshot generator will be committed,
// the diff journal is not.
newchain.Snapshots().Cap(newBlocks[len(newBlocks)-1].Root(), 0)
// Simulate the blockchain crash
// Don't call chain.Stop here, so that no snapshot
// journal and latest state will be committed
// Restart the chain after the crash
newchain, err = NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer newchain.Stop()
snaptest.verify(t, newchain, blocks)
}
// wipeCrashSnapshotTest is the test type used to test this scenario:
// - have a complete snapshot
// - restart, insert more blocks without enabling the snapshot
// - restart again with enabling the snapshot
// - crash
type wipeCrashSnapshotTest struct {
snapshotTestBasic
newBlocks int
}
func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
chain, blocks := snaptest.prepare(t)
// Firstly, stop the chain properly, with all snapshot journal
// and state committed.
chain.Stop()
config := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 0,
}
newchain, err := NewBlockChain(snaptest.db, config, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], snaptest.engine, snaptest.gendb, snaptest.newBlocks, func(i int, b *BlockGen) {})
newchain.InsertChain(newBlocks)
newchain.Stop()
// Restart the chain, the wiper should starts working
config = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: false, // Don't wait rebuild
}
newchain, err = NewBlockChain(snaptest.db, config, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
// Simulate the blockchain crash.
newchain, err = NewBlockChain(snaptest.db, nil, params.AllEthashProtocolChanges, snaptest.engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
snaptest.verify(t, newchain, blocks)
}
// Tests a Geth restart with valid snapshot. Before the shutdown, all snapshot
// journal will be persisted correctly. In this case no snapshot recovery is
// required.
......@@ -129,11 +483,9 @@ func TestRestartWithNewSnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : C8
// Expected snapshot disk : G
testSnapshot(t, &snapshotTest{
test := &snapshotTest{
snapshotTestBasic{
legacy: false,
crash: false,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -142,7 +494,10 @@ func TestRestartWithNewSnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 8,
expSnapshotBottom: 0, // Initial disk layer built from genesis
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth restart with valid but "legacy" snapshot. Before the shutdown,
......@@ -166,11 +521,10 @@ func TestRestartWithLegacySnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : C8
// Expected snapshot disk : G
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &snapshotTest{
snapshotTestBasic{
legacy: true,
crash: false,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -179,7 +533,10 @@ func TestRestartWithLegacySnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 8,
expSnapshotBottom: 0, // Initial disk layer built from genesis
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was crashed and restarts with a broken snapshot. In this case the
......@@ -205,11 +562,9 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : G
// Expected snapshot disk : C4
testSnapshot(t, &snapshotTest{
test := &crashSnapshotTest{
snapshotTestBasic{
legacy: false,
crash: true,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 0,
......@@ -218,7 +573,10 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 0,
expSnapshotBottom: 4, // Last committed disk layer, wait recovery
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was crashed and restarts with a broken snapshot. In this case the
......@@ -244,11 +602,9 @@ func TestLowCommitCrashWithNewSnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : C2
// Expected snapshot disk : C4
testSnapshot(t, &snapshotTest{
test := &crashSnapshotTest{
snapshotTestBasic{
legacy: false,
crash: true,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 2,
......@@ -257,7 +613,10 @@ func TestLowCommitCrashWithNewSnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 2,
expSnapshotBottom: 4, // Last committed disk layer, wait recovery
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was crashed and restarts with a broken snapshot. In this case
......@@ -283,11 +642,9 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : G
// Expected snapshot disk : C4
testSnapshot(t, &snapshotTest{
test := &crashSnapshotTest{
snapshotTestBasic{
legacy: false,
crash: true,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 6,
......@@ -296,7 +653,10 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 0,
expSnapshotBottom: 4, // Last committed disk layer, wait recovery
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was crashed and restarts with a broken and "legacy format"
......@@ -321,11 +681,10 @@ func TestNoCommitCrashWithLegacySnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : G
// Expected snapshot disk : G
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &crashSnapshotTest{
snapshotTestBasic{
legacy: true,
crash: true,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 0,
......@@ -334,7 +693,10 @@ func TestNoCommitCrashWithLegacySnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 0,
expSnapshotBottom: 0, // Rebuilt snapshot from the latest HEAD(genesis)
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was crashed and restarts with a broken and "legacy format"
......@@ -359,11 +721,10 @@ func TestLowCommitCrashWithLegacySnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : C2
// Expected snapshot disk : C2
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &crashSnapshotTest{
snapshotTestBasic{
legacy: true,
crash: true,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 2,
......@@ -372,7 +733,10 @@ func TestLowCommitCrashWithLegacySnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 2,
expSnapshotBottom: 2, // Rebuilt snapshot from the latest HEAD
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was crashed and restarts with a broken and "legacy format"
......@@ -402,11 +766,10 @@ func TestHighCommitCrashWithLegacySnapshot(t *testing.T) {
// Expected head fast block: C8
// Expected head block : G
// Expected snapshot disk : G
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &crashSnapshotTest{
snapshotTestBasic{
legacy: true,
crash: true,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 6,
......@@ -415,7 +778,10 @@ func TestHighCommitCrashWithLegacySnapshot(t *testing.T) {
expHeadFastBlock: 8,
expHeadBlock: 0,
expSnapshotBottom: 0, // Rebuilt snapshot from the latest HEAD(genesis)
})
},
}
test.test(t)
test.teardown()
}
// Tests a Geth was running with snapshot enabled. Then restarts without
......@@ -439,11 +805,9 @@ func TestGappedNewSnapshot(t *testing.T) {
// Expected head fast block: C10
// Expected head block : C10
// Expected snapshot disk : C10
testSnapshot(t, &snapshotTest{
test := &gappedSnapshotTest{
snapshotTestBasic: snapshotTestBasic{
legacy: false,
crash: false,
gapped: 2,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -452,7 +816,11 @@ func TestGappedNewSnapshot(t *testing.T) {
expHeadFastBlock: 10,
expHeadBlock: 10,
expSnapshotBottom: 10, // Rebuilt snapshot from the latest HEAD
})
},
gapped: 2,
}
test.test(t)
test.teardown()
}
// Tests a Geth was running with leagcy snapshot enabled. Then restarts
......@@ -476,11 +844,10 @@ func TestGappedLegacySnapshot(t *testing.T) {
// Expected head fast block: C10
// Expected head block : C10
// Expected snapshot disk : C10
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &gappedSnapshotTest{
snapshotTestBasic: snapshotTestBasic{
legacy: true,
crash: false,
gapped: 2,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -489,7 +856,11 @@ func TestGappedLegacySnapshot(t *testing.T) {
expHeadFastBlock: 10,
expHeadBlock: 10,
expSnapshotBottom: 10, // Rebuilt snapshot from the latest HEAD
})
},
gapped: 2,
}
test.test(t)
test.teardown()
}
// Tests the Geth was running with snapshot enabled and resetHead is applied.
......@@ -513,11 +884,9 @@ func TestSetHeadWithNewSnapshot(t *testing.T) {
// Expected head fast block: C4
// Expected head block : C4
// Expected snapshot disk : G
testSnapshot(t, &snapshotTest{
test := &setHeadSnapshotTest{
snapshotTestBasic: snapshotTestBasic{
legacy: false,
crash: false,
gapped: 0,
setHead: 4,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -526,7 +895,11 @@ func TestSetHeadWithNewSnapshot(t *testing.T) {
expHeadFastBlock: 4,
expHeadBlock: 4,
expSnapshotBottom: 0, // The initial disk layer is built from the genesis
})
},
setHead: 4,
}
test.test(t)
test.teardown()
}
// Tests the Geth was running with snapshot(legacy-format) enabled and resetHead
......@@ -550,11 +923,10 @@ func TestSetHeadWithLegacySnapshot(t *testing.T) {
// Expected head fast block: C4
// Expected head block : C4
// Expected snapshot disk : G
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &setHeadSnapshotTest{
snapshotTestBasic: snapshotTestBasic{
legacy: true,
crash: false,
gapped: 0,
setHead: 4,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -563,7 +935,11 @@ func TestSetHeadWithLegacySnapshot(t *testing.T) {
expHeadFastBlock: 4,
expHeadBlock: 4,
expSnapshotBottom: 0, // The initial disk layer is built from the genesis
})
},
setHead: 4,
}
test.test(t)
test.teardown()
}
// Tests the Geth was running with snapshot(legacy-format) enabled and upgrades
......@@ -589,12 +965,10 @@ func TestRecoverSnapshotFromCrashWithLegacyDiffJournal(t *testing.T) {
// Expected head fast block: C10
// Expected head block : C8
// Expected snapshot disk : C10
testSnapshot(t, &snapshotTest{
t.Skip("Legacy format testing is not supported")
test := &restartCrashSnapshotTest{
snapshotTestBasic: snapshotTestBasic{
legacy: true,
crash: false,
restartCrash: 2,
gapped: 0,
setHead: 0,
chainBlocks: 8,
snapshotBlock: 0,
commitBlock: 0,
......@@ -603,195 +977,48 @@ func TestRecoverSnapshotFromCrashWithLegacyDiffJournal(t *testing.T) {
expHeadFastBlock: 10,
expHeadBlock: 8, // The persisted state in the first running
expSnapshotBottom: 10, // The persisted disk layer in the second running
})
}
func testSnapshot(t *testing.T, tt *snapshotTest) {
// It's hard to follow the test case, visualize the input
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump())
// Create a temporary persistent database
datadir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("Failed to create temporary datadir: %v", err)
}
os.RemoveAll(datadir)
db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
if err != nil {
t.Fatalf("Failed to create persistent database: %v", err)
}
defer db.Close() // Might double close, should be fine
// Initialize a fresh chain
var (
genesis = new(Genesis).MustCommit(db)
engine = ethash.NewFullFaker()
gendb = rawdb.NewMemoryDatabase()
// Snapshot is enabled, the first snapshot is created from the Genesis.
// The snapshot memory allowance is 256MB, it means no snapshot flush
// will happen during the block insertion.
cacheConfig = defaultCacheConfig
)
chain, err := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
blocks, _ := GenerateChain(params.TestChainConfig, genesis, engine, gendb, tt.chainBlocks, func(i int, b *BlockGen) {})
// Insert the blocks with configured settings.
var breakpoints []uint64
if tt.commitBlock > tt.snapshotBlock {
breakpoints = append(breakpoints, tt.snapshotBlock, tt.commitBlock)
} else {
breakpoints = append(breakpoints, tt.commitBlock, tt.snapshotBlock)
}
var startPoint uint64
for _, point := range breakpoints {
if _, err := chain.InsertChain(blocks[startPoint:point]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
startPoint = point
if tt.commitBlock > 0 && tt.commitBlock == point {
chain.stateCache.TrieDB().Commit(blocks[point-1].Root(), true, nil)
}
if tt.snapshotBlock > 0 && tt.snapshotBlock == point {
if tt.legacy {
// Here we commit the snapshot disk root to simulate
// committing the legacy snapshot.
rawdb.WriteSnapshotRoot(db, blocks[point-1].Root())
} else {
chain.snaps.Cap(blocks[point-1].Root(), 0)
diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
t.Fatalf("Failed to flush disk layer change, want %x, got %x", blockRoot, diskRoot)
}
}
}
}
if _, err := chain.InsertChain(blocks[startPoint:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
// Set the flag for writing legacy journal if necessary
if tt.legacy {
chain.writeLegacyJournal = true
},
newBlocks: 2,
}
// Pull the plug on the database, simulating a hard crash
if tt.crash {
db.Close()
// Start a new blockchain back up and see where the repair leads us
db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "")
if err != nil {
t.Fatalf("Failed to reopen persistent database: %v", err)
}
defer db.Close()
// The interesting thing is: instead of start the blockchain after
// the crash, we do restart twice here: one after the crash and one
// after the normal stop. It's used to ensure the broken snapshot
// can be detected all the time.
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
chain.Stop()
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer chain.Stop()
} else if tt.gapped > 0 {
// Insert blocks without enabling snapshot if gapping is required.
chain.Stop()
gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {})
// Insert a few more blocks without enabling snapshot
var cacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 0,
}
chain, err = NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
chain.InsertChain(gappedBlocks)
chain.Stop()
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer chain.Stop()
} else if tt.setHead != 0 {
// Rewind the chain if setHead operation is required.
chain.SetHead(tt.setHead)
chain.Stop()
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer chain.Stop()
} else if tt.restartCrash != 0 {
// Firstly, stop the chain properly, with all snapshot journal
// and state committed.
chain.Stop()
// Restart chain, forcibly flush the disk layer journal with new format
newBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.restartCrash, func(i int, b *BlockGen) {})
chain, err = NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
chain.InsertChain(newBlocks)
chain.Snapshots().Cap(newBlocks[len(newBlocks)-1].Root(), 0)
// Simulate the blockchain crash
// Don't call chain.Stop here, so that no snapshot
// journal and latest state will be committed
// Restart the chain after the crash
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer chain.Stop()
} else {
chain.Stop()
// Restart the chain normally
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
defer chain.Stop()
}
// Iterate over all the remaining blocks and ensure there are no gaps
verifyNoGaps(t, chain, true, blocks)
verifyCutoff(t, chain, true, blocks, tt.expCanonicalBlocks)
test.test(t)
test.teardown()
}
if head := chain.CurrentHeader(); head.Number.Uint64() != tt.expHeadHeader {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, tt.expHeadHeader)
}
if head := chain.CurrentFastBlock(); head.NumberU64() != tt.expHeadFastBlock {
t.Errorf("Head fast block mismatch: have %d, want %d", head.NumberU64(), tt.expHeadFastBlock)
}
if head := chain.CurrentBlock(); head.NumberU64() != tt.expHeadBlock {
t.Errorf("Head block mismatch: have %d, want %d", head.NumberU64(), tt.expHeadBlock)
}
// Check the disk layer, ensure they are matched
block := chain.GetBlockByNumber(tt.expSnapshotBottom)
if block == nil {
t.Errorf("The correspnding block[%d] of snapshot disk layer is missing", tt.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
// Tests the Geth was running with a complete snapshot and then imports a few
// more new blocks on top without enabling the snapshot. After the restart,
// crash happens. Check everything is ok after the restart.
func TestRecoverSnapshotFromWipingCrash(t *testing.T) {
// Chain:
// G->C1->C2->C3->C4->C5->C6->C7->C8 (HEAD)
//
// Commit: G
// Snapshot: G
//
// SetHead(0)
//
// ------------------------------
//
// Expected in leveldb:
// G->C1->C2->C3->C4->C5->C6->C7->C8->C9->C10
//
// Expected head header : C10
// Expected head fast block: C10
// Expected head block : C8
// Expected snapshot disk : C10
test := &wipeCrashSnapshotTest{
snapshotTestBasic: snapshotTestBasic{
legacy: false,
chainBlocks: 8,
snapshotBlock: 4,
commitBlock: 0,
expCanonicalBlocks: 10,
expHeadHeader: 10,
expHeadFastBlock: 10,
expHeadBlock: 10,
expSnapshotBottom: 10,
},
newBlocks: 2,
}
test.test(t)
test.teardown()
}
......@@ -101,18 +101,26 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i
wiper = wipeSnapshot(diskdb, true)
}
// Create a new disk layer with an initialized state marker at zero
rawdb.WriteSnapshotRoot(diskdb, root)
var (
stats = &generatorStats{wiping: wiper, start: time.Now()}
batch = diskdb.NewBatch()
genMarker = []byte{} // Initialized but empty!
)
rawdb.WriteSnapshotRoot(batch, root)
journalProgress(batch, genMarker, stats)
if err := batch.Write(); err != nil {
log.Crit("Failed to write initialized state marker", "error", err)
}
base := &diskLayer{
diskdb: diskdb,
triedb: triedb,
root: root,
cache: fastcache.New(cache * 1024 * 1024),
genMarker: []byte{}, // Initialized but empty!
genMarker: genMarker,
genPending: make(chan struct{}),
genAbort: make(chan chan *generatorStats),
}
go base.generate(&generatorStats{wiping: wiper, start: time.Now()})
go base.generate(stats)
log.Debug("Start snapshot generation", "root", root)
return base
}
......@@ -137,10 +145,12 @@ func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorSta
panic(err) // Cannot happen, here to catch dev errors
}
var logstr string
switch len(marker) {
case 0:
switch {
case marker == nil:
logstr = "done"
case common.HashLength:
case bytes.Equal(marker, []byte{}):
logstr = "empty"
case len(marker) == common.HashLength:
logstr = fmt.Sprintf("%#x", marker)
default:
logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
......@@ -307,13 +317,12 @@ func (dl *diskLayer) generate(stats *generatorStats) {
abort <- stats
return
}
// Snapshot fully generated, set the marker to nil
if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
// Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete.
journalProgress(batch, nil, stats)
batch.Write()
}
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
"storage", stats.storage, "elapsed", common.PrettyDuration(time.Since(stats.start)))
......
......@@ -441,6 +441,6 @@ func (dl *diffLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
if err := rlp.Encode(buffer, storage); err != nil {
return common.Hash{}, err
}
log.Debug("Legacy journalled disk layer", "root", dl.root, "parent", dl.parent.Root())
log.Debug("Legacy journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
return base, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment