Commit 770316dc authored by gary rong's avatar gary rong Committed by Péter Szilágyi

core, light: write chain data in atomic way (#20287)

* core: write chain data in atomic way

* core, light: address comments

* core, light: fix linter

* core, light: address comments
parent 0af96d25
This diff is collapsed.
...@@ -45,6 +45,14 @@ const ( ...@@ -45,6 +45,14 @@ const (
// HeaderChain implements the basic block header chain logic that is shared by // HeaderChain implements the basic block header chain logic that is shared by
// core.BlockChain and light.LightChain. It is not usable in itself, only as // core.BlockChain and light.LightChain. It is not usable in itself, only as
// a part of either structure. // a part of either structure.
//
// HeaderChain is responsible for maintaining the header chain including the
// header query and updating.
//
// The components maintained by headerchain includes: (1) total difficult
// (2) header (3) block hash -> number mapping (4) canonical number -> hash mapping
// and (5) head header flag.
//
// It is not thread safe either, the encapsulating chain structures should do // It is not thread safe either, the encapsulating chain structures should do
// the necessary mutex locking/unlocking. // the necessary mutex locking/unlocking.
type HeaderChain struct { type HeaderChain struct {
...@@ -66,10 +74,8 @@ type HeaderChain struct { ...@@ -66,10 +74,8 @@ type HeaderChain struct {
engine consensus.Engine engine consensus.Engine
} }
// NewHeaderChain creates a new HeaderChain structure. // NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points
// getValidator should return the parent's validator // to the parent's interrupt semaphore.
// procInterrupt points to the parent's interrupt semaphore
// wg points to the parent's shutdown wait group
func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) { func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) {
headerCache, _ := lru.New(headerCacheLimit) headerCache, _ := lru.New(headerCacheLimit)
tdCache, _ := lru.New(tdCacheLimit) tdCache, _ := lru.New(tdCacheLimit)
...@@ -147,25 +153,33 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er ...@@ -147,25 +153,33 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
externTd := new(big.Int).Add(header.Difficulty, ptd) externTd := new(big.Int).Add(header.Difficulty, ptd)
// Irrelevant of the canonical status, write the td and header to the database // Irrelevant of the canonical status, write the td and header to the database
if err := hc.WriteTd(hash, number, externTd); err != nil { //
log.Crit("Failed to write header total difficulty", "err", err) // Note all the components of header(td, hash->number index and header) should
// be written atomically.
headerBatch := hc.chainDb.NewBatch()
rawdb.WriteTd(headerBatch, hash, number, externTd)
rawdb.WriteHeader(headerBatch, header)
if err := headerBatch.Write(); err != nil {
log.Crit("Failed to write header into disk", "err", err)
} }
rawdb.WriteHeader(hc.chainDb, header)
// If the total difficulty is higher than our known, add it to the canonical chain // If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining. // Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
// If the header can be added into canonical chain, adjust the
// header chain markers(canonical indexes and head header flag).
//
// Note all markers should be written atomically.
// Delete any canonical number assignments above the new head // Delete any canonical number assignments above the new head
batch := hc.chainDb.NewBatch() markerBatch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ { for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i) hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) { if hash == (common.Hash{}) {
break break
} }
rawdb.DeleteCanonicalHash(batch, i) rawdb.DeleteCanonicalHash(markerBatch, i)
} }
batch.Write()
// Overwrite any stale canonical number assignments // Overwrite any stale canonical number assignments
var ( var (
...@@ -174,16 +188,19 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er ...@@ -174,16 +188,19 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
headHeader = hc.GetHeader(headHash, headNumber) headHeader = hc.GetHeader(headHash, headNumber)
) )
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber) rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
headHash = headHeader.ParentHash headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1 headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber) headHeader = hc.GetHeader(headHash, headNumber)
} }
// Extend the canonical chain with the new header // Extend the canonical chain with the new header
rawdb.WriteCanonicalHash(hc.chainDb, hash, number) rawdb.WriteCanonicalHash(markerBatch, hash, number)
rawdb.WriteHeadHeaderHash(hc.chainDb, hash) rawdb.WriteHeadHeaderHash(markerBatch, hash)
if err := markerBatch.Write(); err != nil {
log.Crit("Failed to write header markers into disk", "err", err)
}
// Last step update all in-memory head header markers
hc.currentHeaderHash = hash hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header)) hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64()) headHeaderGauge.Update(header.Number.Int64())
...@@ -192,9 +209,9 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er ...@@ -192,9 +209,9 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
} else { } else {
status = SideStatTy status = SideStatTy
} }
hc.tdCache.Add(hash, externTd)
hc.headerCache.Add(hash, header) hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number) hc.numberCache.Add(hash, number)
return return
} }
...@@ -396,14 +413,6 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int { ...@@ -396,14 +413,6 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int {
return hc.GetTd(hash, *number) return hc.GetTd(hash, *number)
} }
// WriteTd stores a block's total difficulty into the database, also caching it
// along the way.
func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error {
rawdb.WriteTd(hc.chainDb, hash, number, td)
hc.tdCache.Add(hash, new(big.Int).Set(td))
return nil
}
// GetHeader retrieves a block header from the database by hash and number, // GetHeader retrieves a block header from the database by hash and number,
// caching it if found. // caching it if found.
func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header { func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header {
...@@ -431,6 +440,8 @@ func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header { ...@@ -431,6 +440,8 @@ func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header {
} }
// HasHeader checks if a block header is present in the database or not. // HasHeader checks if a block header is present in the database or not.
// In theory, if header is present in the database, all relative components
// like td and hash->number should be present too.
func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool { func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) { if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) {
return true return true
...@@ -458,10 +469,9 @@ func (hc *HeaderChain) CurrentHeader() *types.Header { ...@@ -458,10 +469,9 @@ func (hc *HeaderChain) CurrentHeader() *types.Header {
return hc.currentHeader.Load().(*types.Header) return hc.currentHeader.Load().(*types.Header)
} }
// SetCurrentHeader sets the current head header of the canonical chain. // SetCurrentHeader sets the in-memory head header marker of the canonical chan
// as the given header.
func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
rawdb.WriteHeadHeaderHash(hc.chainDb, head.Hash())
hc.currentHeader.Store(head) hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash() hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64()) headHeaderGauge.Update(head.Number.Int64())
...@@ -500,11 +510,18 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d ...@@ -500,11 +510,18 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
// first then remove the relative data from the database. // first then remove the relative data from the database.
// //
// Update head first(head fast block, head full block) before deleting the data. // Update head first(head fast block, head full block) before deleting the data.
markerBatch := hc.chainDb.NewBatch()
if updateFn != nil { if updateFn != nil {
updateFn(hc.chainDb, parent) updateFn(markerBatch, parent)
} }
// Update head header then. // Update head header then.
rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash) rawdb.WriteHeadHeaderHash(markerBatch, parentHash)
if err := markerBatch.Write(); err != nil {
log.Crit("Failed to update chain markers", "error", err)
}
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
// Remove the relative data from the database. // Remove the relative data from the database.
if delFn != nil { if delFn != nil {
...@@ -514,13 +531,11 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d ...@@ -514,13 +531,11 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
rawdb.DeleteHeader(batch, hash, num) rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num) rawdb.DeleteTd(batch, hash, num)
rawdb.DeleteCanonicalHash(batch, num) rawdb.DeleteCanonicalHash(batch, num)
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
} }
batch.Write() // Flush all accumulated deletions.
if err := batch.Write(); err != nil {
log.Crit("Failed to rewind block", "error", err)
}
// Clear out any stale content from the caches // Clear out any stale content from the caches
hc.headerCache.Purge() hc.headerCache.Purge()
hc.tdCache.Purge() hc.tdCache.Purge()
......
...@@ -159,7 +159,6 @@ func (lc *LightChain) loadLastState() error { ...@@ -159,7 +159,6 @@ func (lc *LightChain) loadLastState() error {
lc.hc.SetCurrentHeader(header) lc.hc.SetCurrentHeader(header)
} }
} }
// Issue a status log and return // Issue a status log and return
header := lc.hc.CurrentHeader() header := lc.hc.CurrentHeader()
headerTd := lc.GetTd(header.Hash(), header.Number.Uint64()) headerTd := lc.GetTd(header.Hash(), header.Number.Uint64())
...@@ -198,9 +197,13 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) { ...@@ -198,9 +197,13 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) {
defer lc.chainmu.Unlock() defer lc.chainmu.Unlock()
// Prepare the genesis block and reinitialise the chain // Prepare the genesis block and reinitialise the chain
rawdb.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) batch := lc.chainDb.NewBatch()
rawdb.WriteBlock(lc.chainDb, genesis) rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
rawdb.WriteBlock(batch, genesis)
rawdb.WriteHeadHeaderHash(batch, genesis.Hash())
if err := batch.Write(); err != nil {
log.Crit("Failed to reset genesis block", "err", err)
}
lc.genesisBlock = genesis lc.genesisBlock = genesis
lc.hc.SetGenesis(lc.genesisBlock.Header()) lc.hc.SetGenesis(lc.genesisBlock.Header())
lc.hc.SetCurrentHeader(lc.genesisBlock.Header()) lc.hc.SetCurrentHeader(lc.genesisBlock.Header())
...@@ -323,13 +326,22 @@ func (lc *LightChain) Rollback(chain []common.Hash) { ...@@ -323,13 +326,22 @@ func (lc *LightChain) Rollback(chain []common.Hash) {
lc.chainmu.Lock() lc.chainmu.Lock()
defer lc.chainmu.Unlock() defer lc.chainmu.Unlock()
batch := lc.chainDb.NewBatch()
for i := len(chain) - 1; i >= 0; i-- { for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i] hash := chain[i]
// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
if head := lc.hc.CurrentHeader(); head.Hash() == hash { if head := lc.hc.CurrentHeader(); head.Hash() == hash {
rawdb.WriteHeadHeaderHash(batch, head.ParentHash)
lc.hc.SetCurrentHeader(lc.GetHeader(head.ParentHash, head.Number.Uint64()-1)) lc.hc.SetCurrentHeader(lc.GetHeader(head.ParentHash, head.Number.Uint64()-1))
} }
} }
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback light chain", "error", err)
}
} }
// postChainEvents iterates over the events generated by a chain insertion and // postChainEvents iterates over the events generated by a chain insertion and
...@@ -493,6 +505,7 @@ func (lc *LightChain) SyncCheckpoint(ctx context.Context, checkpoint *params.Tru ...@@ -493,6 +505,7 @@ func (lc *LightChain) SyncCheckpoint(ctx context.Context, checkpoint *params.Tru
// Ensure the chain didn't move past the latest block while retrieving it // Ensure the chain didn't move past the latest block while retrieving it
if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() { if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() {
log.Info("Updated latest header based on CHT", "number", header.Number, "hash", header.Hash(), "age", common.PrettyAge(time.Unix(int64(header.Time), 0))) log.Info("Updated latest header based on CHT", "number", header.Number, "hash", header.Hash(), "age", common.PrettyAge(time.Unix(int64(header.Time), 0)))
rawdb.WriteHeadHeaderHash(lc.chainDb, header.Hash())
lc.hc.SetCurrentHeader(header) lc.hc.SetCurrentHeader(header)
} }
return true return true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment