Unverified Commit 1657e439 authored by rjl493456442's avatar rjl493456442 Committed by GitHub

core, les, eth: port snap sync changes (#24898)

core, eth, les, trie: rework snap sync
parent 1c9afc56
...@@ -940,7 +940,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { ...@@ -940,7 +940,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
// The onleaf func is called _serially_, so we can reuse the same account // The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time. // for unmarshalling every time.
var account types.StateAccount var account types.StateAccount
root, accountCommitted, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { root, accountCommitted, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash, _ []byte) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil { if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil return nil
} }
......
...@@ -27,20 +27,20 @@ import ( ...@@ -27,20 +27,20 @@ import (
) )
// NewStateSync create a new state trie download scheduler. // NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync { func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error) *trie.Sync {
// Register the storage slot callback if the external callback is specified. // Register the storage slot callback if the external callback is specified.
var onSlot func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error
if onLeaf != nil { if onLeaf != nil {
onSlot = func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error { onSlot = func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
return onLeaf(paths, leaf) return onLeaf(keys, leaf)
} }
} }
// Register the account callback to connect the state trie and the storage // Register the account callback to connect the state trie and the storage
// trie belongs to the contract. // trie belongs to the contract.
var syncer *trie.Sync var syncer *trie.Sync
onAccount := func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error { onAccount := func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
if onLeaf != nil { if onLeaf != nil {
if err := onLeaf(paths, leaf); err != nil { if err := onLeaf(keys, leaf); err != nil {
return err return err
} }
} }
...@@ -48,8 +48,8 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(p ...@@ -48,8 +48,8 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(p
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err return err
} }
syncer.AddSubTrie(obj.Root, hexpath, parent, onSlot) syncer.AddSubTrie(obj.Root, path, parent, parentPath, onSlot)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), hexpath, parent) syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent, parentPath)
return nil return nil
} }
syncer = trie.NewSync(root, database, onAccount) syncer = trie.NewSync(root, database, onAccount)
......
This diff is collapsed.
...@@ -21,7 +21,6 @@ import ( ...@@ -21,7 +21,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"os"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
...@@ -515,7 +514,7 @@ func TestSkeletonSyncExtend(t *testing.T) { ...@@ -515,7 +514,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
// Tests that the skeleton sync correctly retrieves headers from one or more // Tests that the skeleton sync correctly retrieves headers from one or more
// peers without duplicates or other strange side effects. // peers without duplicates or other strange side effects.
func TestSkeletonSyncRetrievals(t *testing.T) { func TestSkeletonSyncRetrievals(t *testing.T) {
log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) //log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// Since skeleton headers don't need to be meaningful, beyond a parent hash // Since skeleton headers don't need to be meaningful, beyond a parent hash
// progression, create a long fake chain to test with. // progression, create a long fake chain to test with.
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie"
) )
func hexToNibbles(s string) []byte { func hexToNibbles(s string) []byte {
...@@ -38,22 +37,17 @@ func hexToNibbles(s string) []byte { ...@@ -38,22 +37,17 @@ func hexToNibbles(s string) []byte {
} }
func TestRequestSorting(t *testing.T) { func TestRequestSorting(t *testing.T) {
// - Path 0x9 -> {0x19} // - Path 0x9 -> {0x19}
// - Path 0x99 -> {0x0099} // - Path 0x99 -> {0x0099}
// - Path 0x01234567890123456789012345678901012345678901234567890123456789019 -> {0x0123456789012345678901234567890101234567890123456789012345678901, 0x19} // - Path 0x01234567890123456789012345678901012345678901234567890123456789019 -> {0x0123456789012345678901234567890101234567890123456789012345678901, 0x19}
// - Path 0x012345678901234567890123456789010123456789012345678901234567890199 -> {0x0123456789012345678901234567890101234567890123456789012345678901, 0x0099} // - Path 0x012345678901234567890123456789010123456789012345678901234567890199 -> {0x0123456789012345678901234567890101234567890123456789012345678901, 0x0099}
var f = func(path string) (trie.SyncPath, TrieNodePathSet, common.Hash) { var f = func(path string) string {
data := hexToNibbles(path) data := hexToNibbles(path)
sp := trie.NewSyncPath(data) return string(data)
tnps := TrieNodePathSet([][]byte(sp))
hash := common.Hash{}
return sp, tnps, hash
} }
var ( var (
hashes []common.Hash hashes []common.Hash
paths []trie.SyncPath paths []string
pathsets []TrieNodePathSet
) )
for _, x := range []string{ for _, x := range []string{
"0x9", "0x9",
...@@ -67,15 +61,14 @@ func TestRequestSorting(t *testing.T) { ...@@ -67,15 +61,14 @@ func TestRequestSorting(t *testing.T) {
"0x01234567890123456789012345678901012345678901234567890123456789010", "0x01234567890123456789012345678901012345678901234567890123456789010",
"0x01234567890123456789012345678901012345678901234567890123456789011", "0x01234567890123456789012345678901012345678901234567890123456789011",
} { } {
sp, _, hash := f(x) paths = append(paths, f(x))
hashes = append(hashes, hash) hashes = append(hashes, common.Hash{})
paths = append(paths, sp)
} }
_, paths, pathsets = sortByAccountPath(hashes, paths) _, _, syncPaths, pathsets := sortByAccountPath(paths, hashes)
{ {
var b = new(bytes.Buffer) var b = new(bytes.Buffer)
for i := 0; i < len(paths); i++ { for i := 0; i < len(syncPaths); i++ {
fmt.Fprintf(b, "\n%d. paths %x", i, paths[i]) fmt.Fprintf(b, "\n%d. paths %x", i, syncPaths[i])
} }
want := ` want := `
0. paths [0099] 0. paths [0099]
......
...@@ -230,8 +230,8 @@ type trienodeHealRequest struct { ...@@ -230,8 +230,8 @@ type trienodeHealRequest struct {
timeout *time.Timer // Timer to track delivery timeout timeout *time.Timer // Timer to track delivery timeout
stale chan struct{} // Channel to signal the request was dropped stale chan struct{} // Channel to signal the request was dropped
hashes []common.Hash // Trie node hashes to validate responses paths []string // Trie node paths for identifying trie node
paths []trie.SyncPath // Trie node paths requested for rescheduling hashes []common.Hash // Trie node hashes to validate responses
task *healTask // Task which this request is filling (only access fields through the runloop!!) task *healTask // Task which this request is filling (only access fields through the runloop!!)
} }
...@@ -240,9 +240,9 @@ type trienodeHealRequest struct { ...@@ -240,9 +240,9 @@ type trienodeHealRequest struct {
type trienodeHealResponse struct { type trienodeHealResponse struct {
task *healTask // Task which this request is filling task *healTask // Task which this request is filling
hashes []common.Hash // Hashes of the trie nodes to avoid double hashing paths []string // Paths of the trie nodes
paths []trie.SyncPath // Trie node paths requested for rescheduling missing ones hashes []common.Hash // Hashes of the trie nodes to avoid double hashing
nodes [][]byte // Actual trie nodes to store into the database (nil = missing) nodes [][]byte // Actual trie nodes to store into the database (nil = missing)
} }
// bytecodeHealRequest tracks a pending bytecode request to ensure responses are to // bytecodeHealRequest tracks a pending bytecode request to ensure responses are to
...@@ -321,8 +321,8 @@ type storageTask struct { ...@@ -321,8 +321,8 @@ type storageTask struct {
type healTask struct { type healTask struct {
scheduler *trie.Sync // State trie sync scheduler defining the tasks scheduler *trie.Sync // State trie sync scheduler defining the tasks
trieTasks map[common.Hash]trie.SyncPath // Set of trie node tasks currently queued for retrieval trieTasks map[string]common.Hash // Set of trie node tasks currently queued for retrieval, indexed by node path
codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval, indexed by code hash
} }
// SyncProgress is a database entry to allow suspending and resuming a snapshot state // SyncProgress is a database entry to allow suspending and resuming a snapshot state
...@@ -540,7 +540,7 @@ func (s *Syncer) Unregister(id string) error { ...@@ -540,7 +540,7 @@ func (s *Syncer) Unregister(id string) error {
return nil return nil
} }
// Sync starts (or resumes a previous) sync cycle to iterate over an state trie // Sync starts (or resumes a previous) sync cycle to iterate over a state trie
// with the given root and reconstruct the nodes based on the snapshot leaves. // with the given root and reconstruct the nodes based on the snapshot leaves.
// Previously downloaded segments will not be redownloaded of fixed, rather any // Previously downloaded segments will not be redownloaded of fixed, rather any
// errors will be healed after the leaves are fully accumulated. // errors will be healed after the leaves are fully accumulated.
...@@ -551,7 +551,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { ...@@ -551,7 +551,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.root = root s.root = root
s.healer = &healTask{ s.healer = &healTask{
scheduler: state.NewStateSync(root, s.db, s.onHealState), scheduler: state.NewStateSync(root, s.db, s.onHealState),
trieTasks: make(map[common.Hash]trie.SyncPath), trieTasks: make(map[string]common.Hash),
codeTasks: make(map[common.Hash]struct{}), codeTasks: make(map[common.Hash]struct{}),
} }
s.statelessPeers = make(map[string]struct{}) s.statelessPeers = make(map[string]struct{})
...@@ -743,7 +743,7 @@ func (s *Syncer) loadSyncStatus() { ...@@ -743,7 +743,7 @@ func (s *Syncer) loadSyncStatus() {
return return
} }
} }
// Either we've failed to decode the previus state, or there was none. // Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling // Start a fresh sync by chunking up the account range and scheduling
// them for retrieval. // them for retrieval.
s.tasks = nil s.tasks = nil
...@@ -1280,9 +1280,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai ...@@ -1280,9 +1280,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
want = maxTrieRequestCount + maxCodeRequestCount want = maxTrieRequestCount + maxCodeRequestCount
) )
if have < want { if have < want {
nodes, paths, codes := s.healer.scheduler.Missing(want - have) paths, hashes, codes := s.healer.scheduler.Missing(want - have)
for i, hash := range nodes { for i, path := range paths {
s.healer.trieTasks[hash] = paths[i] s.healer.trieTasks[path] = hashes[i]
} }
for _, hash := range codes { for _, hash := range codes {
s.healer.codeTasks[hash] = struct{}{} s.healer.codeTasks[hash] = struct{}{}
...@@ -1323,21 +1323,20 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai ...@@ -1323,21 +1323,20 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
} }
var ( var (
hashes = make([]common.Hash, 0, cap) hashes = make([]common.Hash, 0, cap)
paths = make([]trie.SyncPath, 0, cap) paths = make([]string, 0, cap)
pathsets = make([]TrieNodePathSet, 0, cap) pathsets = make([]TrieNodePathSet, 0, cap)
) )
for hash, pathset := range s.healer.trieTasks { for path, hash := range s.healer.trieTasks {
delete(s.healer.trieTasks, hash) delete(s.healer.trieTasks, path)
paths = append(paths, path)
hashes = append(hashes, hash) hashes = append(hashes, hash)
paths = append(paths, pathset) if len(paths) >= cap {
if len(hashes) >= cap {
break break
} }
} }
// Group requests by account hash // Group requests by account hash
hashes, paths, pathsets = sortByAccountPath(hashes, paths) paths, hashes, _, pathsets = sortByAccountPath(paths, hashes)
req := &trienodeHealRequest{ req := &trienodeHealRequest{
peer: idle, peer: idle,
id: reqid, id: reqid,
...@@ -1346,8 +1345,8 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai ...@@ -1346,8 +1345,8 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
revert: fail, revert: fail,
cancel: cancel, cancel: cancel,
stale: make(chan struct{}), stale: make(chan struct{}),
hashes: hashes,
paths: paths, paths: paths,
hashes: hashes,
task: s.healer, task: s.healer,
} }
req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() { req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
...@@ -1405,9 +1404,9 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai ...@@ -1405,9 +1404,9 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
want = maxTrieRequestCount + maxCodeRequestCount want = maxTrieRequestCount + maxCodeRequestCount
) )
if have < want { if have < want {
nodes, paths, codes := s.healer.scheduler.Missing(want - have) paths, hashes, codes := s.healer.scheduler.Missing(want - have)
for i, hash := range nodes { for i, path := range paths {
s.healer.trieTasks[hash] = paths[i] s.healer.trieTasks[path] = hashes[i]
} }
for _, hash := range codes { for _, hash := range codes {
s.healer.codeTasks[hash] = struct{}{} s.healer.codeTasks[hash] = struct{}{}
...@@ -1703,10 +1702,10 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { ...@@ -1703,10 +1702,10 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
s.lock.Unlock() s.lock.Unlock()
// If there's a timeout timer still running, abort it and mark the trie node // If there's a timeout timer still running, abort it and mark the trie node
// retrievals as not-pending, ready for resheduling // retrievals as not-pending, ready for rescheduling
req.timeout.Stop() req.timeout.Stop()
for i, hash := range req.hashes { for i, path := range req.paths {
req.task.trieTasks[hash] = req.paths[i] req.task.trieTasks[path] = req.hashes[i]
} }
} }
...@@ -2096,14 +2095,14 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { ...@@ -2096,14 +2095,14 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
// If the trie node was not delivered, reschedule it // If the trie node was not delivered, reschedule it
if node == nil { if node == nil {
res.task.trieTasks[hash] = res.paths[i] res.task.trieTasks[res.paths[i]] = res.hashes[i]
continue continue
} }
// Push the trie node into the state syncer // Push the trie node into the state syncer
s.trienodeHealSynced++ s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node)) s.trienodeHealBytes += common.StorageSize(len(node))
err := s.healer.scheduler.Process(trie.SyncResult{Hash: hash, Data: node}) err := s.healer.scheduler.ProcessNode(trie.NodeSyncResult{Path: res.paths[i], Data: node})
switch err { switch err {
case nil: case nil:
case trie.ErrAlreadyProcessed: case trie.ErrAlreadyProcessed:
...@@ -2139,7 +2138,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) { ...@@ -2139,7 +2138,7 @@ func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
s.bytecodeHealSynced++ s.bytecodeHealSynced++
s.bytecodeHealBytes += common.StorageSize(len(node)) s.bytecodeHealBytes += common.StorageSize(len(node))
err := s.healer.scheduler.Process(trie.SyncResult{Hash: hash, Data: node}) err := s.healer.scheduler.ProcessCode(trie.CodeSyncResult{Hash: hash, Data: node})
switch err { switch err {
case nil: case nil:
case trie.ErrAlreadyProcessed: case trie.ErrAlreadyProcessed:
...@@ -2666,9 +2665,9 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error ...@@ -2666,9 +2665,9 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
} }
// Response validated, send it to the scheduler for filling // Response validated, send it to the scheduler for filling
response := &trienodeHealResponse{ response := &trienodeHealResponse{
paths: req.paths,
task: req.task, task: req.task,
hashes: req.hashes, hashes: req.hashes,
paths: req.paths,
nodes: nodes, nodes: nodes,
} }
select { select {
...@@ -2913,8 +2912,9 @@ func (s *capacitySort) Swap(i, j int) { ...@@ -2913,8 +2912,9 @@ func (s *capacitySort) Swap(i, j int) {
// healRequestSort implements the Sort interface, allowing sorting trienode // healRequestSort implements the Sort interface, allowing sorting trienode
// heal requests, which is a prerequisite for merging storage-requests. // heal requests, which is a prerequisite for merging storage-requests.
type healRequestSort struct { type healRequestSort struct {
hashes []common.Hash paths []string
paths []trie.SyncPath hashes []common.Hash
syncPaths []trie.SyncPath
} }
func (t *healRequestSort) Len() int { func (t *healRequestSort) Len() int {
...@@ -2922,8 +2922,8 @@ func (t *healRequestSort) Len() int { ...@@ -2922,8 +2922,8 @@ func (t *healRequestSort) Len() int {
} }
func (t *healRequestSort) Less(i, j int) bool { func (t *healRequestSort) Less(i, j int) bool {
a := t.paths[i] a := t.syncPaths[i]
b := t.paths[j] b := t.syncPaths[j]
switch bytes.Compare(a[0], b[0]) { switch bytes.Compare(a[0], b[0]) {
case -1: case -1:
return true return true
...@@ -2944,8 +2944,9 @@ func (t *healRequestSort) Less(i, j int) bool { ...@@ -2944,8 +2944,9 @@ func (t *healRequestSort) Less(i, j int) bool {
} }
func (t *healRequestSort) Swap(i, j int) { func (t *healRequestSort) Swap(i, j int) {
t.hashes[i], t.hashes[j] = t.hashes[j], t.hashes[i]
t.paths[i], t.paths[j] = t.paths[j], t.paths[i] t.paths[i], t.paths[j] = t.paths[j], t.paths[i]
t.hashes[i], t.hashes[j] = t.hashes[j], t.hashes[i]
t.syncPaths[i], t.syncPaths[j] = t.syncPaths[j], t.syncPaths[i]
} }
// Merge merges the pathsets, so that several storage requests concerning the // Merge merges the pathsets, so that several storage requests concerning the
...@@ -2953,7 +2954,7 @@ func (t *healRequestSort) Swap(i, j int) { ...@@ -2953,7 +2954,7 @@ func (t *healRequestSort) Swap(i, j int) {
// OBS: This operation is moot if t has not first been sorted. // OBS: This operation is moot if t has not first been sorted.
func (t *healRequestSort) Merge() []TrieNodePathSet { func (t *healRequestSort) Merge() []TrieNodePathSet {
var result []TrieNodePathSet var result []TrieNodePathSet
for _, path := range t.paths { for _, path := range t.syncPaths {
pathset := TrieNodePathSet([][]byte(path)) pathset := TrieNodePathSet([][]byte(path))
if len(path) == 1 { if len(path) == 1 {
// It's an account reference. // It's an account reference.
...@@ -2962,7 +2963,7 @@ func (t *healRequestSort) Merge() []TrieNodePathSet { ...@@ -2962,7 +2963,7 @@ func (t *healRequestSort) Merge() []TrieNodePathSet {
// It's a storage reference. // It's a storage reference.
end := len(result) - 1 end := len(result) - 1
if len(result) == 0 || !bytes.Equal(pathset[0], result[end][0]) { if len(result) == 0 || !bytes.Equal(pathset[0], result[end][0]) {
// The account doesn't doesn't match last, create a new entry. // The account doesn't match last, create a new entry.
result = append(result, pathset) result = append(result, pathset)
} else { } else {
// It's the same account as the previous one, add to the storage // It's the same account as the previous one, add to the storage
...@@ -2976,9 +2977,13 @@ func (t *healRequestSort) Merge() []TrieNodePathSet { ...@@ -2976,9 +2977,13 @@ func (t *healRequestSort) Merge() []TrieNodePathSet {
// sortByAccountPath takes hashes and paths, and sorts them. After that, it generates // sortByAccountPath takes hashes and paths, and sorts them. After that, it generates
// the TrieNodePaths and merges paths which belongs to the same account path. // the TrieNodePaths and merges paths which belongs to the same account path.
func sortByAccountPath(hashes []common.Hash, paths []trie.SyncPath) ([]common.Hash, []trie.SyncPath, []TrieNodePathSet) { func sortByAccountPath(paths []string, hashes []common.Hash) ([]string, []common.Hash, []trie.SyncPath, []TrieNodePathSet) {
n := &healRequestSort{hashes, paths} var syncPaths []trie.SyncPath
for _, path := range paths {
syncPaths = append(syncPaths, trie.NewSyncPath([]byte(path)))
}
n := &healRequestSort{paths, hashes, syncPaths}
sort.Sort(n) sort.Sort(n)
pathsets := n.Merge() pathsets := n.Merge()
return n.hashes, n.paths, pathsets return n.paths, n.hashes, n.syncPaths, pathsets
} }
...@@ -34,7 +34,7 @@ import ( ...@@ -34,7 +34,7 @@ import (
// a single data retrieval network packet. // a single data retrieval network packet.
type stateReq struct { type stateReq struct {
nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient) nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient)
trieTasks map[common.Hash]*trieTask // Trie node download tasks to track previous attempts trieTasks map[string]*trieTask // Trie node download tasks to track previous attempts
codeTasks map[common.Hash]*codeTask // Byte code download tasks to track previous attempts codeTasks map[common.Hash]*codeTask // Byte code download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires timer *time.Timer // Timer to fire when the RTT timeout expires
...@@ -263,8 +263,8 @@ type stateSync struct { ...@@ -263,8 +263,8 @@ type stateSync struct {
sched *trie.Sync // State trie sync scheduler defining the tasks sched *trie.Sync // State trie sync scheduler defining the tasks
keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with keccak crypto.KeccakState // Keccak256 hasher to verify deliveries with
trieTasks map[common.Hash]*trieTask // Set of trie node tasks currently queued for retrieval trieTasks map[string]*trieTask // Set of trie node tasks currently queued for retrieval, indexed by path
codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval codeTasks map[common.Hash]*codeTask // Set of byte code tasks currently queued for retrieval, indexed by hash
numUncommitted int numUncommitted int
bytesUncommitted int bytesUncommitted int
...@@ -281,6 +281,7 @@ type stateSync struct { ...@@ -281,6 +281,7 @@ type stateSync struct {
// trieTask represents a single trie node download task, containing a set of // trieTask represents a single trie node download task, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort. // peers already attempted retrieval from to detect stalled syncs and abort.
type trieTask struct { type trieTask struct {
hash common.Hash
path [][]byte path [][]byte
attempts map[string]struct{} attempts map[string]struct{}
} }
...@@ -299,7 +300,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync { ...@@ -299,7 +300,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
root: root, root: root,
sched: state.NewStateSync(root, d.stateDB, nil), sched: state.NewStateSync(root, d.stateDB, nil),
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState), keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
trieTasks: make(map[common.Hash]*trieTask), trieTasks: make(map[string]*trieTask),
codeTasks: make(map[common.Hash]*codeTask), codeTasks: make(map[common.Hash]*codeTask),
deliver: make(chan *stateReq), deliver: make(chan *stateReq),
cancel: make(chan struct{}), cancel: make(chan struct{}),
...@@ -455,10 +456,11 @@ func (s *stateSync) assignTasks() { ...@@ -455,10 +456,11 @@ func (s *stateSync) assignTasks() {
func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) { func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths []trie.SyncPath, codes []common.Hash) {
// Refill available tasks from the scheduler. // Refill available tasks from the scheduler.
if fill := n - (len(s.trieTasks) + len(s.codeTasks)); fill > 0 { if fill := n - (len(s.trieTasks) + len(s.codeTasks)); fill > 0 {
nodes, paths, codes := s.sched.Missing(fill) paths, hashes, codes := s.sched.Missing(fill)
for i, hash := range nodes { for i, path := range paths {
s.trieTasks[hash] = &trieTask{ s.trieTasks[path] = &trieTask{
path: paths[i], hash: hashes[i],
path: trie.NewSyncPath([]byte(path)),
attempts: make(map[string]struct{}), attempts: make(map[string]struct{}),
} }
} }
...@@ -474,7 +476,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths ...@@ -474,7 +476,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
paths = make([]trie.SyncPath, 0, n) paths = make([]trie.SyncPath, 0, n)
codes = make([]common.Hash, 0, n) codes = make([]common.Hash, 0, n)
req.trieTasks = make(map[common.Hash]*trieTask, n) req.trieTasks = make(map[string]*trieTask, n)
req.codeTasks = make(map[common.Hash]*codeTask, n) req.codeTasks = make(map[common.Hash]*codeTask, n)
for hash, t := range s.codeTasks { for hash, t := range s.codeTasks {
...@@ -492,7 +494,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths ...@@ -492,7 +494,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
req.codeTasks[hash] = t req.codeTasks[hash] = t
delete(s.codeTasks, hash) delete(s.codeTasks, hash)
} }
for hash, t := range s.trieTasks { for path, t := range s.trieTasks {
// Stop when we've gathered enough requests // Stop when we've gathered enough requests
if len(nodes)+len(codes) == n { if len(nodes)+len(codes) == n {
break break
...@@ -504,11 +506,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths ...@@ -504,11 +506,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) (nodes []common.Hash, paths
// Assign the request to this peer // Assign the request to this peer
t.attempts[req.peer.id] = struct{}{} t.attempts[req.peer.id] = struct{}{}
nodes = append(nodes, hash) nodes = append(nodes, t.hash)
paths = append(paths, t.path) paths = append(paths, t.path)
req.trieTasks[hash] = t req.trieTasks[path] = t
delete(s.trieTasks, hash) delete(s.trieTasks, path)
} }
req.nItems = uint16(len(nodes) + len(codes)) req.nItems = uint16(len(nodes) + len(codes))
return nodes, paths, codes return nodes, paths, codes
...@@ -530,7 +532,7 @@ func (s *stateSync) process(req *stateReq) (int, error) { ...@@ -530,7 +532,7 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// Iterate over all the delivered data and inject one-by-one into the trie // Iterate over all the delivered data and inject one-by-one into the trie
for _, blob := range req.response { for _, blob := range req.response {
hash, err := s.processNodeData(blob) hash, err := s.processNodeData(req.trieTasks, req.codeTasks, blob)
switch err { switch err {
case nil: case nil:
s.numUncommitted++ s.numUncommitted++
...@@ -543,13 +545,10 @@ func (s *stateSync) process(req *stateReq) (int, error) { ...@@ -543,13 +545,10 @@ func (s *stateSync) process(req *stateReq) (int, error) {
default: default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
} }
// Delete from both queues (one delivery is enough for the syncer)
delete(req.trieTasks, hash)
delete(req.codeTasks, hash)
} }
// Put unfulfilled tasks back into the retry queue // Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len() npeers := s.d.peers.Len()
for hash, task := range req.trieTasks { for path, task := range req.trieTasks {
// If the node did deliver something, missing items may be due to a protocol // If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit // limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls). // the node to retry the missing items (to avoid single-peer stalls).
...@@ -559,10 +558,10 @@ func (s *stateSync) process(req *stateReq) (int, error) { ...@@ -559,10 +558,10 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// If we've requested the node too many times already, it may be a malicious // If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort. // sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers { if len(task.attempts) >= npeers {
return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) return successful, fmt.Errorf("trie node %s failed with all peers (%d tries, %d peers)", task.hash.TerminalString(), len(task.attempts), npeers)
} }
// Missing item, place into the retry queue. // Missing item, place into the retry queue.
s.trieTasks[hash] = task s.trieTasks[path] = task
} }
for hash, task := range req.codeTasks { for hash, task := range req.codeTasks {
// If the node did deliver something, missing items may be due to a protocol // If the node did deliver something, missing items may be due to a protocol
...@@ -585,13 +584,35 @@ func (s *stateSync) process(req *stateReq) (int, error) { ...@@ -585,13 +584,35 @@ func (s *stateSync) process(req *stateReq) (int, error) {
// processNodeData tries to inject a trie node data blob delivered from a remote // processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any // peer into the state trie, returning whether anything useful was written or any
// error occurred. // error occurred.
func (s *stateSync) processNodeData(blob []byte) (common.Hash, error) { //
res := trie.SyncResult{Data: blob} // If multiple requests correspond to the same hash, this method will inject the
// blob as a result for the first one only, leaving the remaining duplicates to
// be fetched again.
func (s *stateSync) processNodeData(nodeTasks map[string]*trieTask, codeTasks map[common.Hash]*codeTask, blob []byte) (common.Hash, error) {
var hash common.Hash
s.keccak.Reset() s.keccak.Reset()
s.keccak.Write(blob) s.keccak.Write(blob)
s.keccak.Read(res.Hash[:]) s.keccak.Read(hash[:])
err := s.sched.Process(res)
return res.Hash, err if _, present := codeTasks[hash]; present {
err := s.sched.ProcessCode(trie.CodeSyncResult{
Hash: hash,
Data: blob,
})
delete(codeTasks, hash)
return hash, err
}
for path, task := range nodeTasks {
if task.hash == hash {
err := s.sched.ProcessNode(trie.NodeSyncResult{
Path: path,
Data: blob,
})
delete(nodeTasks, path)
return hash, err
}
}
return common.Hash{}, trie.ErrNotRequested
} }
// updateStats bumps the various state sync progress counters and displays a log // updateStats bumps the various state sync progress counters and displays a log
......
...@@ -33,6 +33,7 @@ type leaf struct { ...@@ -33,6 +33,7 @@ type leaf struct {
size int // size of the rlp data (estimate) size int // size of the rlp data (estimate)
hash common.Hash // hash of rlp data hash common.Hash // hash of rlp data
node node // the node to commit node node // the node to commit
path []byte // the path from the root node
} }
// committer is a type used for the trie Commit operation. A committer has some // committer is a type used for the trie Commit operation. A committer has some
...@@ -69,7 +70,7 @@ func (c *committer) Commit(n node, db *Database) (hashNode, int, error) { ...@@ -69,7 +70,7 @@ func (c *committer) Commit(n node, db *Database) (hashNode, int, error) {
if db == nil { if db == nil {
return nil, 0, errors.New("no db provided") return nil, 0, errors.New("no db provided")
} }
h, committed, err := c.commit(n, db) h, committed, err := c.commit(nil, n, db)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
...@@ -77,7 +78,7 @@ func (c *committer) Commit(n node, db *Database) (hashNode, int, error) { ...@@ -77,7 +78,7 @@ func (c *committer) Commit(n node, db *Database) (hashNode, int, error) {
} }
// commit collapses a node down into a hash node and inserts it into the database // commit collapses a node down into a hash node and inserts it into the database
func (c *committer) commit(n node, db *Database) (node, int, error) { func (c *committer) commit(path []byte, n node, db *Database) (node, int, error) {
// if this path is clean, use available cached data // if this path is clean, use available cached data
hash, dirty := n.cache() hash, dirty := n.cache()
if hash != nil && !dirty { if hash != nil && !dirty {
...@@ -93,7 +94,7 @@ func (c *committer) commit(n node, db *Database) (node, int, error) { ...@@ -93,7 +94,7 @@ func (c *committer) commit(n node, db *Database) (node, int, error) {
// otherwise it can only be hashNode or valueNode. // otherwise it can only be hashNode or valueNode.
var childCommitted int var childCommitted int
if _, ok := cn.Val.(*fullNode); ok { if _, ok := cn.Val.(*fullNode); ok {
childV, committed, err := c.commit(cn.Val, db) childV, committed, err := c.commit(append(path, cn.Key...), cn.Val, db)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
...@@ -101,20 +102,20 @@ func (c *committer) commit(n node, db *Database) (node, int, error) { ...@@ -101,20 +102,20 @@ func (c *committer) commit(n node, db *Database) (node, int, error) {
} }
// The key needs to be copied, since we're delivering it to database // The key needs to be copied, since we're delivering it to database
collapsed.Key = hexToCompact(cn.Key) collapsed.Key = hexToCompact(cn.Key)
hashedNode := c.store(collapsed, db) hashedNode := c.store(path, collapsed, db)
if hn, ok := hashedNode.(hashNode); ok { if hn, ok := hashedNode.(hashNode); ok {
return hn, childCommitted + 1, nil return hn, childCommitted + 1, nil
} }
return collapsed, childCommitted, nil return collapsed, childCommitted, nil
case *fullNode: case *fullNode:
hashedKids, childCommitted, err := c.commitChildren(cn, db) hashedKids, childCommitted, err := c.commitChildren(path, cn, db)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
collapsed := cn.copy() collapsed := cn.copy()
collapsed.Children = hashedKids collapsed.Children = hashedKids
hashedNode := c.store(collapsed, db) hashedNode := c.store(path, collapsed, db)
if hn, ok := hashedNode.(hashNode); ok { if hn, ok := hashedNode.(hashNode); ok {
return hn, childCommitted + 1, nil return hn, childCommitted + 1, nil
} }
...@@ -128,7 +129,7 @@ func (c *committer) commit(n node, db *Database) (node, int, error) { ...@@ -128,7 +129,7 @@ func (c *committer) commit(n node, db *Database) (node, int, error) {
} }
// commitChildren commits the children of the given fullnode // commitChildren commits the children of the given fullnode
func (c *committer) commitChildren(n *fullNode, db *Database) ([17]node, int, error) { func (c *committer) commitChildren(path []byte, n *fullNode, db *Database) ([17]node, int, error) {
var ( var (
committed int committed int
children [17]node children [17]node
...@@ -148,7 +149,7 @@ func (c *committer) commitChildren(n *fullNode, db *Database) ([17]node, int, er ...@@ -148,7 +149,7 @@ func (c *committer) commitChildren(n *fullNode, db *Database) ([17]node, int, er
// Commit the child recursively and store the "hashed" value. // Commit the child recursively and store the "hashed" value.
// Note the returned node can be some embedded nodes, so it's // Note the returned node can be some embedded nodes, so it's
// possible the type is not hashNode. // possible the type is not hashNode.
hashed, childCommitted, err := c.commit(child, db) hashed, childCommitted, err := c.commit(append(path, byte(i)), child, db)
if err != nil { if err != nil {
return children, 0, err return children, 0, err
} }
...@@ -165,7 +166,7 @@ func (c *committer) commitChildren(n *fullNode, db *Database) ([17]node, int, er ...@@ -165,7 +166,7 @@ func (c *committer) commitChildren(n *fullNode, db *Database) ([17]node, int, er
// store hashes the node n and if we have a storage layer specified, it writes // store hashes the node n and if we have a storage layer specified, it writes
// the key/value pair to it and tracks any node->child references as well as any // the key/value pair to it and tracks any node->child references as well as any
// node->external trie references. // node->external trie references.
func (c *committer) store(n node, db *Database) node { func (c *committer) store(path []byte, n node, db *Database) node {
// Larger nodes are replaced by their hash and stored in the database. // Larger nodes are replaced by their hash and stored in the database.
var ( var (
hash, _ = n.cache() hash, _ = n.cache()
...@@ -189,6 +190,7 @@ func (c *committer) store(n node, db *Database) node { ...@@ -189,6 +190,7 @@ func (c *committer) store(n node, db *Database) node {
size: size, size: size,
hash: common.BytesToHash(hash), hash: common.BytesToHash(hash),
node: n, node: n,
path: path,
} }
} else if db != nil { } else if db != nil {
// No leaf-callback used, but there's still a database. Do serial // No leaf-callback used, but there's still a database. Do serial
...@@ -213,13 +215,13 @@ func (c *committer) commitLoop(db *Database) { ...@@ -213,13 +215,13 @@ func (c *committer) commitLoop(db *Database) {
switch n := n.(type) { switch n := n.(type) {
case *shortNode: case *shortNode:
if child, ok := n.Val.(valueNode); ok { if child, ok := n.Val.(valueNode); ok {
c.onleaf(nil, nil, child, hash) c.onleaf(nil, nil, child, hash, nil)
} }
case *fullNode: case *fullNode:
// For children in range [0, 15], it's impossible // For children in range [0, 15], it's impossible
// to contain valueNode. Only check the 17th child. // to contain valueNode. Only check the 17th child.
if n.Children[16] != nil { if n.Children[16] != nil {
c.onleaf(nil, nil, n.Children[16].(valueNode), hash) c.onleaf(nil, nil, n.Children[16].(valueNode), hash, nil)
} }
} }
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -42,18 +42,18 @@ var ( ...@@ -42,18 +42,18 @@ var (
// LeafCallback is a callback type invoked when a trie operation reaches a leaf // LeafCallback is a callback type invoked when a trie operation reaches a leaf
// node. // node.
// //
// The paths is a path tuple identifying a particular trie node either in a single // The keys is a path tuple identifying a particular trie node either in a single
// trie (account) or a layered trie (account -> storage). Each path in the tuple // trie (account) or a layered trie (account -> storage). Each key in the tuple
// is in the raw format(32 bytes). // is in the raw format(32 bytes).
// //
// The hexpath is a composite hexary path identifying the trie node. All the key // The path is a composite hexary path identifying the trie node. All the key
// bytes are converted to the hexary nibbles and composited with the parent path // bytes are converted to the hexary nibbles and composited with the parent path
// if the trie node is in a layered trie. // if the trie node is in a layered trie.
// //
// It's used by state sync and commit to allow handling external references // It's used by state sync and commit to allow handling external references
// between account and storage tries. And also it's used in the state healing // between account and storage tries. And also it's used in the state healing
// for extracting the raw states(leaf nodes) with corresponding paths. // for extracting the raw states(leaf nodes) with corresponding paths.
type LeafCallback func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error type LeafCallback func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error
// Trie is a Merkle Patricia Trie. // Trie is a Merkle Patricia Trie.
// The zero value is an empty trie with no database. // The zero value is an empty trie with no database.
......
...@@ -625,7 +625,7 @@ func BenchmarkCommitAfterHash(b *testing.B) { ...@@ -625,7 +625,7 @@ func BenchmarkCommitAfterHash(b *testing.B) {
benchmarkCommitAfterHash(b, nil) benchmarkCommitAfterHash(b, nil)
}) })
var a types.StateAccount var a types.StateAccount
onleaf := func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error { onleaf := func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
rlp.DecodeBytes(leaf, &a) rlp.DecodeBytes(leaf, &a)
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment