Unverified Commit e146fbe4 authored by Martin Holst Swende's avatar Martin Holst Swende Committed by Péter Szilágyi

core/state: lazy sorting, snapshot invalidation

parent 542df889
This diff is collapsed.
This diff is collapsed.
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package snapshot package snapshot
import ( import (
"sync"
"github.com/allegro/bigcache" "github.com/allegro/bigcache"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
...@@ -32,6 +34,9 @@ type diskLayer struct { ...@@ -32,6 +34,9 @@ type diskLayer struct {
number uint64 // Block number of the base snapshot number uint64 // Block number of the base snapshot
root common.Hash // Root hash of the base snapshot root common.Hash // Root hash of the base snapshot
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex
} }
// Info returns the block number and root hash for which this snapshot was made. // Info returns the block number and root hash for which this snapshot was made.
...@@ -41,28 +46,39 @@ func (dl *diskLayer) Info() (uint64, common.Hash) { ...@@ -41,28 +46,39 @@ func (dl *diskLayer) Info() (uint64, common.Hash) {
// Account directly retrieves the account associated with a particular hash in // Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format. // the snapshot slim data format.
func (dl *diskLayer) Account(hash common.Hash) *Account { func (dl *diskLayer) Account(hash common.Hash) (*Account, error) {
data := dl.AccountRLP(hash) data, err := dl.AccountRLP(hash)
if err != nil {
return nil, err
}
if len(data) == 0 { // can be both nil and []byte{} if len(data) == 0 { // can be both nil and []byte{}
return nil return nil, nil
} }
account := new(Account) account := new(Account)
if err := rlp.DecodeBytes(data, account); err != nil { if err := rlp.DecodeBytes(data, account); err != nil {
panic(err) panic(err)
} }
return account return account, nil
} }
// AccountRLP directly retrieves the account RLP associated with a particular // AccountRLP directly retrieves the account RLP associated with a particular
// hash in the snapshot slim data format. // hash in the snapshot slim data format.
func (dl *diskLayer) AccountRLP(hash common.Hash) []byte { func (dl *diskLayer) AccountRLP(hash common.Hash) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()
// If the layer was flattened into, consider it invalid (any live reference to
// the original should be marked as unusable).
if dl.stale {
return nil, ErrSnapshotStale
}
key := string(hash[:]) key := string(hash[:])
// Try to retrieve the account from the memory cache // Try to retrieve the account from the memory cache
if blob, err := dl.cache.Get(key); err == nil { if blob, err := dl.cache.Get(key); err == nil {
snapshotCleanHitMeter.Mark(1) snapshotCleanHitMeter.Mark(1)
snapshotCleanReadMeter.Mark(int64(len(blob))) snapshotCleanReadMeter.Mark(int64(len(blob)))
return blob return blob, nil
} }
// Cache doesn't contain account, pull from disk and cache for later // Cache doesn't contain account, pull from disk and cache for later
blob := rawdb.ReadAccountSnapshot(dl.db, hash) blob := rawdb.ReadAccountSnapshot(dl.db, hash)
...@@ -71,19 +87,27 @@ func (dl *diskLayer) AccountRLP(hash common.Hash) []byte { ...@@ -71,19 +87,27 @@ func (dl *diskLayer) AccountRLP(hash common.Hash) []byte {
snapshotCleanMissMeter.Mark(1) snapshotCleanMissMeter.Mark(1)
snapshotCleanWriteMeter.Mark(int64(len(blob))) snapshotCleanWriteMeter.Mark(int64(len(blob)))
return blob return blob, nil
} }
// Storage directly retrieves the storage data associated with a particular hash, // Storage directly retrieves the storage data associated with a particular hash,
// within a particular account. // within a particular account.
func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) []byte { func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()
// If the layer was flattened into, consider it invalid (any live reference to
// the original should be marked as unusable).
if dl.stale {
return nil, ErrSnapshotStale
}
key := string(append(accountHash[:], storageHash[:]...)) key := string(append(accountHash[:], storageHash[:]...))
// Try to retrieve the storage slot from the memory cache // Try to retrieve the storage slot from the memory cache
if blob, err := dl.cache.Get(key); err == nil { if blob, err := dl.cache.Get(key); err == nil {
snapshotCleanHitMeter.Mark(1) snapshotCleanHitMeter.Mark(1)
snapshotCleanReadMeter.Mark(int64(len(blob))) snapshotCleanReadMeter.Mark(int64(len(blob)))
return blob return blob, nil
} }
// Cache doesn't contain storage slot, pull from disk and cache for later // Cache doesn't contain storage slot, pull from disk and cache for later
blob := rawdb.ReadStorageSnapshot(dl.db, accountHash, storageHash) blob := rawdb.ReadStorageSnapshot(dl.db, accountHash, storageHash)
...@@ -92,7 +116,7 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) []byte { ...@@ -92,7 +116,7 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) []byte {
snapshotCleanMissMeter.Mark(1) snapshotCleanMissMeter.Mark(1)
snapshotCleanWriteMeter.Mark(int64(len(blob))) snapshotCleanWriteMeter.Mark(int64(len(blob)))
return blob return blob, nil
} }
// Update creates a new layer on top of the existing snapshot diff tree with // Update creates a new layer on top of the existing snapshot diff tree with
......
...@@ -135,6 +135,7 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, ...@@ -135,6 +135,7 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64,
curStorageNodes int curStorageNodes int
curAccountSize common.StorageSize curAccountSize common.StorageSize
curStorageSize common.StorageSize curStorageSize common.StorageSize
accountHash = common.BytesToHash(accIt.Key)
) )
var acc struct { var acc struct {
Nonce uint64 Nonce uint64
...@@ -148,7 +149,7 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, ...@@ -148,7 +149,7 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64,
data := AccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash) data := AccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)
curAccountSize += common.StorageSize(1 + common.HashLength + len(data)) curAccountSize += common.StorageSize(1 + common.HashLength + len(data))
rawdb.WriteAccountSnapshot(batch, common.BytesToHash(accIt.Key), data) rawdb.WriteAccountSnapshot(batch, accountHash, data)
if batch.ValueSize() > ethdb.IdealBatchSize { if batch.ValueSize() > ethdb.IdealBatchSize {
batch.Write() batch.Write()
batch.Reset() batch.Reset()
...@@ -163,7 +164,7 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, ...@@ -163,7 +164,7 @@ func generateSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64,
curStorageSize += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value)) curStorageSize += common.StorageSize(1 + 2*common.HashLength + len(storeIt.Value))
curStorageCount++ curStorageCount++
rawdb.WriteStorageSnapshot(batch, common.BytesToHash(accIt.Key), common.BytesToHash(storeIt.Key), storeIt.Value) rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(storeIt.Key), storeIt.Value)
if batch.ValueSize() > ethdb.IdealBatchSize { if batch.ValueSize() > ethdb.IdealBatchSize {
batch.Write() batch.Write()
batch.Reset() batch.Reset()
......
...@@ -38,6 +38,11 @@ var ( ...@@ -38,6 +38,11 @@ var (
snapshotCleanMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/miss", nil) snapshotCleanMissMeter = metrics.NewRegisteredMeter("state/snapshot/clean/miss", nil)
snapshotCleanReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/read", nil) snapshotCleanReadMeter = metrics.NewRegisteredMeter("state/snapshot/clean/read", nil)
snapshotCleanWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/write", nil) snapshotCleanWriteMeter = metrics.NewRegisteredMeter("state/snapshot/clean/write", nil)
// ErrSnapshotStale is returned from data accessors if the underlying snapshot
// layer had been invalidated due to the chain progressing forward far enough
// to not maintain the layer's original state.
ErrSnapshotStale = errors.New("snapshot stale")
) )
// Snapshot represents the functionality supported by a snapshot storage layer. // Snapshot represents the functionality supported by a snapshot storage layer.
...@@ -47,15 +52,15 @@ type Snapshot interface { ...@@ -47,15 +52,15 @@ type Snapshot interface {
// Account directly retrieves the account associated with a particular hash in // Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format. // the snapshot slim data format.
Account(hash common.Hash) *Account Account(hash common.Hash) (*Account, error)
// AccountRLP directly retrieves the account RLP associated with a particular // AccountRLP directly retrieves the account RLP associated with a particular
// hash in the snapshot slim data format. // hash in the snapshot slim data format.
AccountRLP(hash common.Hash) []byte AccountRLP(hash common.Hash) ([]byte, error)
// Storage directly retrieves the storage data associated with a particular hash, // Storage directly retrieves the storage data associated with a particular hash,
// within a particular account. // within a particular account.
Storage(accountHash, storageHash common.Hash) []byte Storage(accountHash, storageHash common.Hash) ([]byte, error)
} }
// snapshot is the internal version of the snapshot data layer that supports some // snapshot is the internal version of the snapshot data layer that supports some
...@@ -80,7 +85,7 @@ type snapshot interface { ...@@ -80,7 +85,7 @@ type snapshot interface {
} }
// SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent // SnapshotTree is an Ethereum state snapshot tree. It consists of one persistent
// base layer backed by a key-value store, on top of which arbitrarilly many in- // base layer backed by a key-value store, on top of which arbitrarily many in-
// memory diff layers are topped. The memory diffs can form a tree with branching, // memory diff layers are topped. The memory diffs can form a tree with branching,
// but the disk layer is singleton and common to all. If a reorg goes deeper than // but the disk layer is singleton and common to all. If a reorg goes deeper than
// the disk layer, everything needs to be deleted. // the disk layer, everything needs to be deleted.
...@@ -220,7 +225,7 @@ func loadSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, hea ...@@ -220,7 +225,7 @@ func loadSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, hea
if _, err := os.Stat(journal); os.IsNotExist(err) { if _, err := os.Stat(journal); os.IsNotExist(err) {
// Journal doesn't exist, don't worry if it's not supposed to // Journal doesn't exist, don't worry if it's not supposed to
if number != headNumber || root != headRoot { if number != headNumber || root != headRoot {
return nil, fmt.Errorf("snapshot journal missing, head does't match snapshot: #%d [%#x] vs. #%d [%#x]", return nil, fmt.Errorf("snapshot journal missing, head doesn't match snapshot: #%d [%#x] vs. #%d [%#x]",
headNumber, headRoot, number, root) headNumber, headRoot, number, root)
} }
return base, nil return base, nil
...@@ -237,7 +242,7 @@ func loadSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, hea ...@@ -237,7 +242,7 @@ func loadSnapshot(db ethdb.KeyValueStore, journal string, headNumber uint64, hea
// Journal doesn't exist, don't worry if it's not supposed to // Journal doesn't exist, don't worry if it's not supposed to
number, root = snapshot.Info() number, root = snapshot.Info()
if number != headNumber || root != headRoot { if number != headNumber || root != headRoot {
return nil, fmt.Errorf("head does't match snapshot: #%d [%#x] vs. #%d [%#x]", return nil, fmt.Errorf("head doesn't match snapshot: #%d [%#x] vs. #%d [%#x]",
headNumber, headRoot, number, root) headNumber, headRoot, number, root)
} }
return snapshot, nil return snapshot, nil
......
...@@ -60,3 +60,33 @@ func merge(a, b []common.Hash) []common.Hash { ...@@ -60,3 +60,33 @@ func merge(a, b []common.Hash) []common.Hash {
} }
return result return result
} }
// dedupMerge combines two sorted lists of hashes into a combo sorted one,
// and removes duplicates in the process
func dedupMerge(a, b []common.Hash) []common.Hash {
result := make([]common.Hash, len(a)+len(b))
i := 0
for len(a) > 0 && len(b) > 0 {
if diff := bytes.Compare(a[0][:], b[0][:]); diff < 0 {
result[i] = a[0]
a = a[1:]
} else {
result[i] = b[0]
b = b[1:]
// If they were equal, progress a too
if diff == 0 {
a = a[1:]
}
}
i++
}
for j := 0; j < len(a); j++ {
result[i] = a[j]
i++
}
for j := 0; j < len(b); j++ {
result[i] = b[j]
i++
}
return result[:i]
}
...@@ -204,13 +204,13 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has ...@@ -204,13 +204,13 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
if metrics.EnabledExpensive { if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now()) defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now())
} }
enc = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key[:])) enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key[:]))
} else { }
// Track the amount of time wasted on reading the storage trie // If snapshot unavailable or reading from it failed, load from the database
if s.db.snap == nil || err != nil {
if metrics.EnabledExpensive { if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now()) defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now())
} }
// Otherwise load the value from the database
if enc, err = s.getTrie(db).TryGet(key[:]); err != nil { if enc, err = s.getTrie(db).TryGet(key[:]); err != nil {
s.setError(err) s.setError(err)
return common.Hash{} return common.Hash{}
......
...@@ -511,25 +511,31 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject { ...@@ -511,25 +511,31 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
return obj return obj
} }
// If no live objects are available, attempt to use snapshots // If no live objects are available, attempt to use snapshots
var data Account var (
data Account
err error
)
if s.snap != nil { if s.snap != nil {
if metrics.EnabledExpensive { if metrics.EnabledExpensive {
defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now()) defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now())
} }
acc := s.snap.Account(crypto.Keccak256Hash(addr[:])) var acc *snapshot.Account
if acc == nil { if acc, err = s.snap.Account(crypto.Keccak256Hash(addr[:])); err == nil {
return nil if acc == nil {
} return nil
data.Nonce, data.Balance, data.CodeHash = acc.Nonce, acc.Balance, acc.CodeHash }
if len(data.CodeHash) == 0 { data.Nonce, data.Balance, data.CodeHash = acc.Nonce, acc.Balance, acc.CodeHash
data.CodeHash = emptyCodeHash if len(data.CodeHash) == 0 {
} data.CodeHash = emptyCodeHash
data.Root = common.BytesToHash(acc.Root) }
if data.Root == (common.Hash{}) { data.Root = common.BytesToHash(acc.Root)
data.Root = emptyRoot if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
} }
} else { }
// Snapshot unavailable, fall back to the trie // If snapshot unavailable or reading from it failed, load from the database
if s.snap == nil || err != nil {
if metrics.EnabledExpensive { if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountReads += time.Since(start) }(time.Now()) defer func(start time.Time) { s.AccountReads += time.Since(start) }(time.Now())
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment