Commit ab27bee2 authored by Péter Szilágyi's avatar Péter Szilágyi

core, eth, trie: direct state trie synchronization

parent 832b37c8
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
) )
...@@ -246,6 +247,26 @@ func (bc *BlockChain) SetHead(head uint64) { ...@@ -246,6 +247,26 @@ func (bc *BlockChain) SetHead(head uint64) {
bc.loadLastState() bc.loadLastState()
} }
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// Make sure that both the block as well at it's state trie exists
block := self.GetBlock(hash)
if block == nil {
return fmt.Errorf("non existent block [%x…]", hash[:4])
}
if _, err := trie.NewSecure(block.Root(), self.chainDb); err != nil {
return err
}
// If all checks out, manually set the head block
self.mu.Lock()
self.currentBlock = block
self.mu.Unlock()
glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
return nil
}
func (self *BlockChain) GasLimit() *big.Int { func (self *BlockChain) GasLimit() *big.Int {
self.mu.RLock() self.mu.RLock()
defer self.mu.RUnlock() defer self.mu.RUnlock()
...@@ -721,10 +742,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain ...@@ -721,10 +742,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
self.wg.Add(1) self.wg.Add(1)
defer self.wg.Done() defer self.wg.Done()
// Make sure only one thread manipulates the chain at once
self.chainmu.Lock()
defer self.chainmu.Unlock()
// Collect some import statistics to report on // Collect some import statistics to report on
stats := struct{ processed, ignored int }{} stats := struct{ processed, ignored int }{}
start := time.Now() start := time.Now()
......
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"bytes"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
type StateSync struct {
db ethdb.Database
sync *trie.TrieSync
codeReqs map[common.Hash]struct{} // requested but not yet written to database
codeReqList []common.Hash // requested since last Missing
}
var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil))
func NewStateSync(root common.Hash, db ethdb.Database) *StateSync {
ss := &StateSync{
db: db,
codeReqs: make(map[common.Hash]struct{}),
}
ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash
ss.sync = trie.NewTrieSync(root, db, ss.leafFound)
return ss
}
func (self *StateSync) leafFound(leaf []byte, parent common.Hash) error {
var obj struct {
Nonce uint64
Balance *big.Int
Root common.Hash
CodeHash []byte
}
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
self.sync.AddSubTrie(obj.Root, 64, parent, nil)
codehash := common.BytesToHash(obj.CodeHash)
if _, ok := self.codeReqs[codehash]; !ok {
code, _ := self.db.Get(obj.CodeHash)
if code == nil {
self.codeReqs[codehash] = struct{}{}
self.codeReqList = append(self.codeReqList, codehash)
}
}
return nil
}
func (self *StateSync) Missing(max int) []common.Hash {
cr := len(self.codeReqList)
gh := 0
if max != 0 {
if cr > max {
cr = max
}
gh = max - cr
}
list := append(self.sync.Missing(gh), self.codeReqList[:cr]...)
self.codeReqList = self.codeReqList[cr:]
return list
}
func (self *StateSync) Process(list []trie.SyncResult) error {
for i := 0; i < len(list); i++ {
if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node
self.db.Put(list[i].Hash[:], list[i].Data)
delete(self.codeReqs, list[i].Hash)
list[i] = list[len(list)-1]
list = list[:len(list)-1]
i--
}
}
_, err := self.sync.Process(list)
return err
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"bytes"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/trie"
)
// testAccount is the data associated with an account used by the state tests.
type testAccount struct {
address common.Address
balance *big.Int
nonce uint64
code []byte
}
// makeTestState create a sample test state to test node-wise reconstruction.
func makeTestState() (ethdb.Database, common.Hash, []*testAccount) {
// Create an empty state
db, _ := ethdb.NewMemDatabase()
state := New(common.Hash{}, db)
// Fill it with some arbitrary data
accounts := []*testAccount{}
for i := byte(0); i < 255; i++ {
obj := state.GetOrNewStateObject(common.BytesToAddress([]byte{i}))
acc := &testAccount{address: common.BytesToAddress([]byte{i})}
obj.AddBalance(big.NewInt(int64(11 * i)))
acc.balance = big.NewInt(int64(11 * i))
obj.SetNonce(uint64(42 * i))
acc.nonce = uint64(42 * i)
if i%3 == 0 {
obj.SetCode([]byte{i, i, i, i, i})
acc.code = []byte{i, i, i, i, i}
}
state.UpdateStateObject(obj)
accounts = append(accounts, acc)
}
root, _ := state.Commit()
// Return the generated state
return db, root, accounts
}
// checkStateAccounts cross references a reconstructed state with an expected
// account array.
func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) {
state := New(root, db)
for i, acc := range accounts {
if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 {
t.Errorf("account %d: balance mismatch: have %v, want %v", i, balance, acc.balance)
}
if nonce := state.GetNonce(acc.address); nonce != acc.nonce {
t.Errorf("account %d: nonce mismatch: have %v, want %v", i, nonce, acc.nonce)
}
if code := state.GetCode(acc.address); bytes.Compare(code, acc.code) != 0 {
t.Errorf("account %d: code mismatch: have %x, want %x", i, code, acc.code)
}
}
}
// Tests that an empty state is not scheduled for syncing.
func TestEmptyStateSync(t *testing.T) {
empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
db, _ := ethdb.NewMemDatabase()
if req := NewStateSync(empty, db).Missing(1); len(req) != 0 {
t.Errorf("content requested for empty state: %v", req)
}
}
// Tests that given a root hash, a state can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go.
func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) }
func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) }
func testIterativeStateSync(t *testing.T, batch int) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := append([]common.Hash{}, sched.Missing(batch)...)
for len(queue) > 0 {
results := make([]trie.SyncResult, len(queue))
for i, hash := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = trie.SyncResult{hash, data}
}
if err := sched.Process(results); err != nil {
t.Fatalf("failed to process results: %v", err)
}
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned, and the others sent only later.
func TestIterativeDelayedStateSync(t *testing.T) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := append([]common.Hash{}, sched.Missing(0)...)
for len(queue) > 0 {
// Sync only half of the scheduled nodes
results := make([]trie.SyncResult, len(queue)/2+1)
for i, hash := range queue[:len(results)] {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = trie.SyncResult{hash, data}
}
if err := sched.Process(results); err != nil {
t.Fatalf("failed to process results: %v", err)
}
queue = append(queue[len(results):], sched.Missing(0)...)
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go, however in a
// random order.
func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) }
func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) }
func testIterativeRandomStateSync(t *testing.T, batch int) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Fetch all the queued nodes in a random order
results := make([]trie.SyncResult, 0, len(queue))
for hash, _ := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, trie.SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
if err := sched.Process(results); err != nil {
t.Fatalf("failed to process results: %v", err)
}
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
}
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned (Even those randomly), others sent only later.
func TestIterativeRandomDelayedStateSync(t *testing.T) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Sync only half of the scheduled nodes, even those in random order
results := make([]trie.SyncResult, 0, len(queue)/2+1)
for hash, _ := range queue {
delete(queue, hash)
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, trie.SyncResult{hash, data})
if len(results) >= cap(results) {
break
}
}
// Feed the retrieved results back and queue new tasks
if err := sched.Process(results); err != nil {
t.Fatalf("failed to process results: %v", err)
}
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
This diff is collapsed.
This diff is collapsed.
...@@ -47,4 +47,9 @@ var ( ...@@ -47,4 +47,9 @@ var (
receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req") receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req")
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop") receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout") receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
stateInMeter = metrics.NewMeter("eth/downloader/states/in")
stateReqTimer = metrics.NewTimer("eth/downloader/states/req")
stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout")
) )
...@@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error ...@@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error type receiptFetcherFn func([]common.Hash) error
type stateFetcherFn func([]common.Hash) error
var ( var (
errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyFetching = errors.New("already fetching blocks from peer")
...@@ -55,12 +56,16 @@ type peer struct { ...@@ -55,12 +56,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation rep int32 // Simple peer reputation
blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
receiptCapacity int32 // Number of receipts allowed to fetch per request receiptCapacity int32 // Number of receipts allowed to fetch per request
blockStarted time.Time // Time instance when the last block (body)fetch was started stateCapacity int32 // Number of node data pieces allowed to fetch per request
receiptStarted time.Time // Time instance when the last receipt fetch was started
blockStarted time.Time // Time instance when the last block (body)fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously) ignored *set.Set // Set of hashes not to request (didn't have previously)
...@@ -73,6 +78,7 @@ type peer struct { ...@@ -73,6 +78,7 @@ type peer struct {
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies version int // Eth protocol version number to switch strategies
} }
...@@ -82,12 +88,13 @@ type peer struct { ...@@ -82,12 +88,13 @@ type peer struct {
func newPeer(id string, version int, head common.Hash, func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn) *peer { getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head, head: head,
blockCapacity: 1, blockCapacity: 1,
receiptCapacity: 1, receiptCapacity: 1,
stateCapacity: 1,
ignored: set.New(), ignored: set.New(),
getRelHashes: getRelHashes, getRelHashes: getRelHashes,
...@@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash, ...@@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash,
getBlockBodies: getBlockBodies, getBlockBodies: getBlockBodies,
getReceipts: getReceipts, getReceipts: getReceipts,
getNodeData: getNodeData,
version: version, version: version,
} }
...@@ -110,6 +118,7 @@ func (p *peer) Reset() { ...@@ -110,6 +118,7 @@ func (p *peer) Reset() {
atomic.StoreInt32(&p.receiptIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.blockCapacity, 1) atomic.StoreInt32(&p.blockCapacity, 1)
atomic.StoreInt32(&p.receiptCapacity, 1) atomic.StoreInt32(&p.receiptCapacity, 1)
atomic.StoreInt32(&p.stateCapacity, 1)
p.ignored.Clear() p.ignored.Clear()
} }
...@@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error { ...@@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
return nil return nil
} }
// FetchNodeData sends a node state data retrieval request to the remote peer.
func (p *peer) FetchNodeData(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
return errAlreadyFetching
}
p.stateStarted = time.Now()
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
for hash, _ := range request.Hashes {
hashes = append(hashes, hash)
}
go p.getNodeData(hashes)
return nil
}
// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. // SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards, // Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not. // depending on whether the previous fetch completed in time or not.
...@@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() { ...@@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() {
p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
} }
// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
// requests. Its node data retrieval allowance will also be updated either up- or
// downwards, depending on whether the previous fetch completed in time or not.
func (p *peer) SetNodeDataIdle() {
p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests. // setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards, // Its data retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not. // depending on whether the previous fetch completed in time or not.
...@@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int { ...@@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int {
return int(atomic.LoadInt32(&p.receiptCapacity)) return int(atomic.LoadInt32(&p.receiptCapacity))
} }
// NodeDataCapacity retrieves the peers block download allowance based on its
// previously discovered bandwidth capacity.
func (p *peer) NodeDataCapacity() int {
return int(atomic.LoadInt32(&p.stateCapacity))
}
// Promote increases the peer's reputation. // Promote increases the peer's reputation.
func (p *peer) Promote() { func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1) atomic.AddInt32(&p.rep, 1)
...@@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer { ...@@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer {
// BlockIdlePeers retrieves a flat list of all the currently idle peers within the // BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation. // active peer set, ordered by their reputation.
func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) { func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
ps.lock.RLock() idle := func(p *peer) bool {
defer ps.lock.RUnlock() return atomic.LoadInt32(&p.blockIdle) == 0
idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
if atomic.LoadInt32(&p.blockIdle) == 0 {
idle = append(idle, p)
}
total++
}
} }
for i := 0; i < len(idle); i++ { return ps.idlePeers(61, 61, idle)
for j := i + 1; j < len(idle); j++ { }
if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
idle[i], idle[j] = idle[j], idle[i] // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
} // the active peer set, ordered by their reputation.
} func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
} }
return idle, total return ps.idlePeers(62, 64, idle)
} }
// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
// active peer set, ordered by their reputation. // within the active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
return ps.idlePeers(63, 64, idle)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
// peers within the active peer set, ordered by their reputation.
func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
return ps.idlePeers(63, 64, idle)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
ps.lock.RLock() ps.lock.RLock()
defer ps.lock.RUnlock() defer ps.lock.RUnlock()
idle, total := make([]*peer, 0, len(ps.peers)), 0 idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers { for _, p := range ps.peers {
if p.version >= 63 { if p.version >= minProtocol && p.version <= maxProtocol {
if atomic.LoadInt32(&p.receiptIdle) == 0 { if idleCheck(p) {
idle = append(idle, p) idle = append(idle, p)
} }
total++ total++
......
This diff is collapsed.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// headerCheckFn is a callback type for verifying a header's presence in the local chain.
type headerCheckFn func(common.Hash) bool
// blockCheckFn is a callback type for verifying a block's presence in the local chain.
type blockCheckFn func(common.Hash) bool
// headerRetrievalFn is a callback type for retrieving a header from the local chain.
type headerRetrievalFn func(common.Hash) *types.Header
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
type headHeaderRetrievalFn func() *types.Header
// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
type headBlockRetrievalFn func() *types.Block
// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
type headFastBlockRetrievalFn func() *types.Block
// headBlockCommitterFn is a callback for directly committing the head block to a certain entity.
type headBlockCommitterFn func(common.Hash) error
// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
type tdRetrievalFn func(common.Hash) *big.Int
// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
type headerChainInsertFn func([]*types.Header, bool) (int, error)
// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
type blockChainInsertFn func(types.Blocks) (int, error)
// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
// dataPack is a data message returned by a peer for some query.
type dataPack interface {
PeerId() string
Items() int
Stats() string
}
// hashPack is a batch of block hashes returned by a peer (eth/61).
type hashPack struct {
peerId string
hashes []common.Hash
}
func (p *hashPack) PeerId() string { return p.peerId }
func (p *hashPack) Items() int { return len(p.hashes) }
func (p *hashPack) Stats() string { return fmt.Sprintf("%d", len(p.hashes)) }
// blockPack is a batch of blocks returned by a peer (eth/61).
type blockPack struct {
peerId string
blocks []*types.Block
}
func (p *blockPack) PeerId() string { return p.peerId }
func (p *blockPack) Items() int { return len(p.blocks) }
func (p *blockPack) Stats() string { return fmt.Sprintf("%d", len(p.blocks)) }
// headerPack is a batch of block headers returned by a peer.
type headerPack struct {
peerId string
headers []*types.Header
}
func (p *headerPack) PeerId() string { return p.peerId }
func (p *headerPack) Items() int { return len(p.headers) }
func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
// bodyPack is a batch of block bodies returned by a peer.
type bodyPack struct {
peerId string
transactions [][]*types.Transaction
uncles [][]*types.Header
}
func (p *bodyPack) PeerId() string { return p.peerId }
func (p *bodyPack) Items() int {
if len(p.transactions) <= len(p.uncles) {
return len(p.transactions)
}
return len(p.uncles)
}
func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
// receiptPack is a batch of receipts returned by a peer.
type receiptPack struct {
peerId string
receipts [][]*types.Receipt
}
func (p *receiptPack) PeerId() string { return p.peerId }
func (p *receiptPack) Items() int { return len(p.receipts) }
func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
// statePack is a batch of states returned by a peer.
type statePack struct {
peerId string
states [][]byte
}
func (p *statePack) PeerId() string { return p.peerId }
func (p *statePack) Items() int { return len(p.states) }
func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) }
...@@ -129,9 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP ...@@ -129,9 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP
case LightMode: case LightMode:
syncMode = downloader.LightSync syncMode = downloader.LightSync
} }
manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, manager.downloader = downloader.New(syncMode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader,
blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.GetTd, blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead,
blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer) blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error { validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
...@@ -220,8 +220,8 @@ func (pm *ProtocolManager) handle(p *peer) error { ...@@ -220,8 +220,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts); err != nil { p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing
...@@ -307,7 +307,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -307,7 +307,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break break
} }
// Deliver them all to the downloader for queuing // Deliver them all to the downloader for queuing
err := pm.downloader.DeliverHashes61(p.id, hashes) err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil { if err != nil {
glog.V(logger.Debug).Infoln(err) glog.V(logger.Debug).Infoln(err)
} }
...@@ -353,7 +353,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -353,7 +353,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
// Filter out any explicitly requested blocks, deliver the rest to the downloader // Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 { if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
pm.downloader.DeliverBlocks61(p.id, blocks) pm.downloader.DeliverBlocks(p.id, blocks)
} }
// Block header query, collect the requested headers and reply // Block header query, collect the requested headers and reply
...@@ -515,6 +515,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -515,6 +515,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
return p.SendNodeData(data) return p.SendNodeData(data)
case p.version >= eth63 && msg.Code == NodeDataMsg:
// A batch of node state data arrived to one of our previous requests
var data [][]byte
if err := msg.Decode(&data); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver all to the downloader
if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err)
}
case p.version >= eth63 && msg.Code == GetReceiptsMsg: case p.version >= eth63 && msg.Code == GetReceiptsMsg:
// Decode the retrieval message // Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
......
...@@ -191,7 +191,7 @@ func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { ...@@ -191,7 +191,7 @@ func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies) return p2p.Send(p.rw, BlockBodiesMsg, bodies)
} }
// SendNodeData sends a batch of arbitrary internal data, corresponding to the // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested. // hashes requested.
func (p *peer) SendNodeData(data [][]byte) error { func (p *peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, data) return p2p.Send(p.rw, NodeDataMsg, data)
......
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package trie
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
// request represents a scheduled or already in-flight state retrieval request.
type request struct {
hash common.Hash // Hash of the node data content to retrieve
data []byte // Data content of the node, cached until all subtrees complete
object *node // Target node to populate with retrieved data (hashnode originally)
parents []*request // Parent state nodes referencing this entry (notify all upon completion)
depth int // Depth level within the trie the node is located to prioritize DFS
deps int // Number of dependencies before allowed to commit this node
callback TrieSyncLeafCallback // Callback to invoke if a leaf node it reached on this branch
}
// SyncResult is a simple list to return missing nodes along with their request
// hashes.
type SyncResult struct {
Hash common.Hash // Hash of the originally unknown trie node
Data []byte // Data content of the retrieved node
}
// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a
// leaf node. It's used by state syncing to check if the leaf node requires some
// further data syncing.
type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error
// TrieSync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie steb by step until all is done.
type TrieSync struct {
database Database // State database for storing all the assembled node data
requests map[common.Hash]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests
}
// NewTrieSync creates a new trie data download scheduler.
func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallback) *TrieSync {
ts := &TrieSync{
database: database,
requests: make(map[common.Hash]*request),
queue: prque.New(),
}
ts.AddSubTrie(root, 0, common.Hash{}, callback)
return ts
}
// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback TrieSyncLeafCallback) {
// Short circuit if the trie is empty
if root == emptyRoot {
return
}
// Assemble the new sub-trie sync request
node := node(hashNode(root.Bytes()))
req := &request{
object: &node,
hash: root,
depth: depth,
callback: callback,
}
// If this sub-trie has a designated parent, link them together
if parent != (common.Hash{}) {
ancestor := s.requests[parent]
if ancestor == nil {
panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
}
ancestor.deps++
req.parents = append(req.parents, ancestor)
}
s.schedule(req)
}
// Missing retrieves the known missing nodes from the trie for retrieval.
func (s *TrieSync) Missing(max int) []common.Hash {
requests := []common.Hash{}
for !s.queue.Empty() && (max == 0 || len(requests) < max) {
requests = append(requests, s.queue.PopItem().(common.Hash))
}
return requests
}
// Process injects a batch of retrieved trie nodes data.
func (s *TrieSync) Process(results []SyncResult) (int, error) {
for i, item := range results {
// If the item was not requested, bail out
request := s.requests[item.Hash]
if request == nil {
return i, fmt.Errorf("not requested: %x", item.Hash)
}
// Decode the node data content and update the request
node, err := decodeNode(item.Data)
if err != nil {
return i, err
}
*request.object = node
request.data = item.Data
// Create and schedule a request for all the children nodes
requests, err := s.children(request)
if err != nil {
return i, err
}
if len(requests) == 0 && request.deps == 0 {
s.commit(request)
continue
}
request.deps += len(requests)
for _, child := range requests {
s.schedule(child)
}
}
return 0, nil
}
// schedule inserts a new state retrieval request into the fetch queue. If there
// is already a pending request for this node, the new request will be discarded
// and only a parent reference added to the old one.
func (s *TrieSync) schedule(req *request) {
// If we're already requesting this node, add a new reference and stop
if old, ok := s.requests[req.hash]; ok {
old.parents = append(old.parents, req.parents...)
return
}
// Schedule the request for future retrieval
s.queue.Push(req.hash, float32(req.depth))
s.requests[req.hash] = req
}
// children retrieves all the missing children of a state trie entry for future
// retrieval scheduling.
func (s *TrieSync) children(req *request) ([]*request, error) {
// Gather all the children of the node, irrelevant whether known or not
type child struct {
node *node
depth int
}
children := []child{}
switch node := (*req.object).(type) {
case shortNode:
children = []child{{
node: &node.Val,
depth: req.depth + len(node.Key),
}}
case fullNode:
for i := 0; i < 17; i++ {
if node[i] != nil {
children = append(children, child{
node: &node[i],
depth: req.depth + 1,
})
}
}
default:
panic(fmt.Sprintf("unknown node: %+v", node))
}
// Iterate over the children, and request all unknown ones
requests := make([]*request, 0, len(children))
for _, child := range children {
// Notify any external watcher of a new key/value node
if req.callback != nil {
if node, ok := (*child.node).(valueNode); ok {
if err := req.callback(node, req.hash); err != nil {
return nil, err
}
}
}
// If the child references another node, resolve or schedule
if node, ok := (*child.node).(hashNode); ok {
// Try to resolve the node from the local database
blob, _ := s.database.Get(node)
if local, err := decodeNode(blob); local != nil && err == nil {
*child.node = local
continue
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &request{
object: child.node,
hash: common.BytesToHash(node),
parents: []*request{req},
depth: child.depth,
callback: req.callback,
})
}
}
return requests, nil
}
// commit finalizes a retrieval request and stores it into the database. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
func (s *TrieSync) commit(req *request) error {
// Write the node content to disk
if err := s.database.Put(req.hash[:], req.data); err != nil {
return err
}
delete(s.requests, req.hash)
// Check all parents for completion
for _, parent := range req.parents {
parent.deps--
if parent.deps == 0 {
if err := s.commit(parent); err != nil {
return err
}
}
}
return nil
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package trie
import (
"bytes"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
)
// makeTestTrie create a sample test trie to test node-wise reconstruction.
func makeTestTrie() (ethdb.Database, *Trie, map[string][]byte) {
// Create an empty trie
db, _ := ethdb.NewMemDatabase()
trie, _ := New(common.Hash{}, db)
// Fill it with some arbitrary data
content := make(map[string][]byte)
for i := byte(0); i < 255; i++ {
key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i}
content[string(key)] = val
trie.Update(key, val)
key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i}
content[string(key)] = val
trie.Update(key, val)
}
trie.Commit()
// Return the generated trie
return db, trie, content
}
// checkTrieContents cross references a reconstructed trie with an expected data
// content map.
func checkTrieContents(t *testing.T, db Database, root []byte, content map[string][]byte) {
trie, err := New(common.BytesToHash(root), db)
if err != nil {
t.Fatalf("failed to create trie at %x: %v", root, err)
}
for key, val := range content {
if have := trie.Get([]byte(key)); bytes.Compare(have, val) != 0 {
t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val)
}
}
}
// Tests that an empty trie is not scheduled for syncing.
func TestEmptyTrieSync(t *testing.T) {
emptyA, _ := New(common.Hash{}, nil)
emptyB, _ := New(emptyRoot, nil)
for i, trie := range []*Trie{emptyA, emptyB} {
db, _ := ethdb.NewMemDatabase()
if req := NewTrieSync(common.BytesToHash(trie.Root()), db, nil).Missing(1); len(req) != 0 {
t.Errorf("test %d: content requested for empty trie: %v", i, req)
}
}
}
// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go.
func TestIterativeTrieSyncIndividual(t *testing.T) { testIterativeTrieSync(t, 1) }
func TestIterativeTrieSyncBatched(t *testing.T) { testIterativeTrieSync(t, 100) }
func testIterativeTrieSync(t *testing.T, batch int) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := append([]common.Hash{}, sched.Missing(batch)...)
for len(queue) > 0 {
results := make([]SyncResult, len(queue))
for i, hash := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = SyncResult{hash, data}
}
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned, and the others sent only later.
func TestIterativeDelayedTrieSync(t *testing.T) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := append([]common.Hash{}, sched.Missing(10000)...)
for len(queue) > 0 {
// Sync only half of the scheduled nodes
results := make([]SyncResult, len(queue)/2+1)
for i, hash := range queue[:len(results)] {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = SyncResult{hash, data}
}
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[len(results):], sched.Missing(10000)...)
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go, however in a
// random order.
func TestIterativeRandomTrieSyncIndividual(t *testing.T) { testIterativeRandomTrieSync(t, 1) }
func TestIterativeRandomTrieSyncBatched(t *testing.T) { testIterativeRandomTrieSync(t, 100) }
func testIterativeRandomTrieSync(t *testing.T, batch int) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Fetch all the queued nodes in a random order
results := make([]SyncResult, 0, len(queue))
for hash, _ := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
}
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned (Even those randomly), others sent only later.
func TestIterativeRandomDelayedTrieSync(t *testing.T) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(10000) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Sync only half of the scheduled nodes, even those in random order
results := make([]SyncResult, 0, len(queue)/2+1)
for hash, _ := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, SyncResult{hash, data})
if len(results) >= cap(results) {
break
}
}
// Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
for _, result := range results {
delete(queue, result.Hash)
}
for _, hash := range sched.Missing(10000) {
queue[hash] = struct{}{}
}
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
// Tests that a trie sync will not request nodes multiple times, even if they
// have such references.
func TestDuplicateAvoidanceTrieSync(t *testing.T) {
// Create a random trie to copy
srcDb, srcTrie, srcData := makeTestTrie()
// Create a destination trie and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
queue := append([]common.Hash{}, sched.Missing(0)...)
requested := make(map[common.Hash]struct{})
for len(queue) > 0 {
results := make([]SyncResult, len(queue))
for i, hash := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
if _, ok := requested[hash]; ok {
t.Errorf("hash %x already requested once", hash)
}
requested[hash] = struct{}{}
results[i] = SyncResult{hash, data}
}
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(0)...)
}
// Cross check that the two tries re in sync
checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment