Commit 1cc2f080 authored by Jeffrey Wilcke's avatar Jeffrey Wilcke

Merge pull request #1784 from karalabe/standard-sync-stats

eth, rpc: standardize the chain sync progress counters
parents e9a80518 d4d3fc6a
...@@ -130,10 +130,9 @@ type Downloader struct { ...@@ -130,10 +130,9 @@ type Downloader struct {
interrupt int32 // Atomic boolean to signal termination interrupt int32 // Atomic boolean to signal termination
// Statistics // Statistics
importStart time.Time // Instance when the last blocks were taken from the cache syncStatsOrigin uint64 // Origin block number where syncing started at
importQueue []*Block // Previously taken blocks to check import progress syncStatsHeight uint64 // Highest block number known when syncing started
importDone int // Number of taken blocks already imported from the last batch syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
importLock sync.Mutex
// Callbacks // Callbacks
hasBlock hashCheckFn // Checks if a block is present in the chain hasBlock hashCheckFn // Checks if a block is present in the chain
...@@ -161,6 +160,7 @@ type Downloader struct { ...@@ -161,6 +160,7 @@ type Downloader struct {
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
// Testing hooks // Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) chainInsertHook func([]*Block) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
} }
...@@ -192,27 +192,14 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he ...@@ -192,27 +192,14 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
} }
} }
// Stats retrieves the current status of the downloader. // Boundaries retrieves the synchronisation boundaries, specifically the origin
func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) { // block where synchronisation started at (may have failed/suspended) and the
// Fetch the download status // latest known block which the synchonisation targets.
pending, cached = d.queue.Size() func (d *Downloader) Boundaries() (uint64, uint64) {
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
// Figure out the import progress return d.syncStatsOrigin, d.syncStatsHeight
d.importLock.Lock()
defer d.importLock.Unlock()
for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
d.importQueue = d.importQueue[1:]
d.importDone++
}
importing = len(d.importQueue)
// Make an estimate on the total sync
estimate = 0
if d.importDone > 0 {
estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
}
return
} }
// Synchronising returns whether the downloader is currently retrieving blocks. // Synchronising returns whether the downloader is currently retrieving blocks.
...@@ -333,14 +320,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e ...@@ -333,14 +320,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch { switch {
case p.version == eth61: case p.version == eth61:
// Old eth/61, use forward, concurrent hash and block retrieval algorithm // Look up the sync boundaries: the common ancestor and the target block
number, err := d.findAncestor61(p) latest, err := d.fetchHeight61(p)
if err != nil {
return err
}
origin, err := d.findAncestor61(p)
if err != nil { if err != nil {
return err return err
} }
d.syncStatsLock.Lock()
if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
d.syncStatsOrigin = origin
}
d.syncStatsHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
errc := make(chan error, 2) errc := make(chan error, 2)
go func() { errc <- d.fetchHashes61(p, td, number+1) }() go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
go func() { errc <- d.fetchBlocks61(number + 1) }() go func() { errc <- d.fetchBlocks61(origin + 1) }()
// If any fetcher fails, cancel the other // If any fetcher fails, cancel the other
if err := <-errc; err != nil { if err := <-errc; err != nil {
...@@ -351,14 +353,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e ...@@ -351,14 +353,29 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return <-errc return <-errc
case p.version >= eth62: case p.version >= eth62:
// New eth/62, use forward, concurrent header and block body retrieval algorithm // Look up the sync boundaries: the common ancestor and the target block
number, err := d.findAncestor(p) latest, err := d.fetchHeight(p)
if err != nil { if err != nil {
return err return err
} }
origin, err := d.findAncestor(p)
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
d.syncStatsOrigin = origin
}
d.syncStatsHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
errc := make(chan error, 2) errc := make(chan error, 2)
go func() { errc <- d.fetchHeaders(p, td, number+1) }() go func() { errc <- d.fetchHeaders(p, td, origin+1) }()
go func() { errc <- d.fetchBodies(number + 1) }() go func() { errc <- d.fetchBodies(origin + 1) }()
// If any fetcher fails, cancel the other // If any fetcher fails, cancel the other
if err := <-errc; err != nil { if err := <-errc; err != nil {
...@@ -401,6 +418,50 @@ func (d *Downloader) Terminate() { ...@@ -401,6 +418,50 @@ func (d *Downloader) Terminate() {
d.cancel() d.cancel()
} }
// fetchHeight61 retrieves the head block of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
// Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head})
timeout := time.After(blockSoftTTL)
for {
select {
case <-d.cancelCh:
return 0, errCancelBlockFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case <-d.hashCh:
// Out of bounds hashes received, ignore them
case blockPack := <-d.blockCh:
// Discard anything not from the origin peer
if blockPack.peerId != p.id {
glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId)
break
}
// Make sure the peer actually gave something valid
blocks := blockPack.blocks
if len(blocks) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
return 0, errBadPeer
}
return blocks[0].NumberU64(), nil
case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout
}
}
}
// findAncestor61 tries to locate the common ancestor block of the local chain and // findAncestor61 tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and // a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match. // on the correct chain, checking the top N blocks should already get us a match.
...@@ -776,6 +837,50 @@ func (d *Downloader) fetchBlocks61(from uint64) error { ...@@ -776,6 +837,50 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
} }
} }
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
// Request the advertised remote head block and wait for the response
go p.getRelHeaders(p.head, 1, 0, false)
timeout := time.After(headerTTL)
for {
select {
case <-d.cancelCh:
return 0, errCancelBlockFetch
case headerPack := <-d.headerCh:
// Discard anything not from the origin peer
if headerPack.peerId != p.id {
glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
break
}
// Make sure the peer actually gave something valid
headers := headerPack.headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return 0, errBadPeer
}
return headers[0].Number.Uint64(), nil
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout
}
}
}
// findAncestor tries to locate the common ancestor block of the local chain and // findAncestor tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and // a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match. // on the correct chain, checking the top N blocks should already get us a match.
...@@ -973,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { ...@@ -973,7 +1078,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
// Otherwise insert all the new headers, aborting in case of junk // Otherwise insert all the new headers, aborting in case of junk
glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from) glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)
inserts := d.queue.Insert(headerPack.headers) inserts := d.queue.Insert(headerPack.headers, from)
if len(inserts) != len(headerPack.headers) { if len(inserts) != len(headerPack.headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p) glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer return errBadPeer
...@@ -1203,16 +1308,10 @@ func (d *Downloader) process() { ...@@ -1203,16 +1308,10 @@ func (d *Downloader) process() {
d.process() d.process()
} }
}() }()
// Release the lock upon exit (note, before checking for reentry!), and set // Release the lock upon exit (note, before checking for reentry!)
// the import statistics to zero. // the import statistics to zero.
defer func() { defer atomic.StoreInt32(&d.processing, 0)
d.importLock.Lock()
d.importQueue = nil
d.importDone = 0
d.importLock.Unlock()
atomic.StoreInt32(&d.processing, 0)
}()
// Repeat the processing as long as there are blocks to import // Repeat the processing as long as there are blocks to import
for { for {
// Fetch the next batch of blocks // Fetch the next batch of blocks
...@@ -1223,13 +1322,6 @@ func (d *Downloader) process() { ...@@ -1223,13 +1322,6 @@ func (d *Downloader) process() {
if d.chainInsertHook != nil { if d.chainInsertHook != nil {
d.chainInsertHook(blocks) d.chainInsertHook(blocks)
} }
// Reset the import statistics
d.importLock.Lock()
d.importStart = time.Now()
d.importQueue = blocks
d.importDone = 0
d.importLock.Unlock()
// Actually import the blocks // Actually import the blocks
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
for len(blocks) != 0 { for len(blocks) != 0 {
......
This diff is collapsed.
...@@ -57,6 +57,7 @@ type queue struct { ...@@ -57,6 +57,7 @@ type queue struct {
headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
pendPool map[string]*fetchRequest // Currently pending block retrieval operations pendPool map[string]*fetchRequest // Currently pending block retrieval operations
...@@ -91,6 +92,7 @@ func (q *queue) Reset() { ...@@ -91,6 +92,7 @@ func (q *queue) Reset() {
q.headerPool = make(map[common.Hash]*types.Header) q.headerPool = make(map[common.Hash]*types.Header)
q.headerQueue.Reset() q.headerQueue.Reset()
q.headerHead = common.Hash{}
q.pendPool = make(map[string]*fetchRequest) q.pendPool = make(map[string]*fetchRequest)
...@@ -186,7 +188,7 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { ...@@ -186,7 +188,7 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
// Insert adds a set of headers for the download queue for scheduling, returning // Insert adds a set of headers for the download queue for scheduling, returning
// the new headers encountered. // the new headers encountered.
func (q *queue) Insert(headers []*types.Header) []*types.Header { func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -196,13 +198,24 @@ func (q *queue) Insert(headers []*types.Header) []*types.Header { ...@@ -196,13 +198,24 @@ func (q *queue) Insert(headers []*types.Header) []*types.Header {
// Make sure no duplicate requests are executed // Make sure no duplicate requests are executed
hash := header.Hash() hash := header.Hash()
if _, ok := q.headerPool[hash]; ok { if _, ok := q.headerPool[hash]; ok {
glog.V(logger.Warn).Infof("Header %x already scheduled", hash) glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4])
continue continue
} }
// Make sure chain order is honored and preserved throughout
if header.Number == nil || header.Number.Uint64() != from {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
break
}
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
break
}
// Queue the header for body retrieval // Queue the header for body retrieval
inserts = append(inserts, header) inserts = append(inserts, header)
q.headerPool[hash] = header q.headerPool[hash] = header
q.headerQueue.Push(header, -float32(header.Number.Uint64())) q.headerQueue.Push(header, -float32(header.Number.Uint64()))
q.headerHead = hash
from++
} }
return inserts return inserts
} }
......
This diff is collapsed.
...@@ -55,7 +55,6 @@ var ( ...@@ -55,7 +55,6 @@ var (
"admin_exportChain": (*adminApi).ExportChain, "admin_exportChain": (*adminApi).ExportChain,
"admin_importChain": (*adminApi).ImportChain, "admin_importChain": (*adminApi).ImportChain,
"admin_verbosity": (*adminApi).Verbosity, "admin_verbosity": (*adminApi).Verbosity,
"admin_chainSyncStatus": (*adminApi).ChainSyncStatus,
"admin_setSolc": (*adminApi).SetSolc, "admin_setSolc": (*adminApi).SetSolc,
"admin_datadir": (*adminApi).DataDir, "admin_datadir": (*adminApi).DataDir,
"admin_startRPC": (*adminApi).StartRPC, "admin_startRPC": (*adminApi).StartRPC,
...@@ -232,17 +231,6 @@ func (self *adminApi) Verbosity(req *shared.Request) (interface{}, error) { ...@@ -232,17 +231,6 @@ func (self *adminApi) Verbosity(req *shared.Request) (interface{}, error) {
return true, nil return true, nil
} }
func (self *adminApi) ChainSyncStatus(req *shared.Request) (interface{}, error) {
pending, cached, importing, estimate := self.ethereum.Downloader().Stats()
return map[string]interface{}{
"blocksAvailable": pending,
"blocksWaitingForImport": cached,
"importing": importing,
"estimate": estimate.String(),
}, nil
}
func (self *adminApi) SetSolc(req *shared.Request) (interface{}, error) { func (self *adminApi) SetSolc(req *shared.Request) (interface{}, error) {
args := new(SetSolcArgs) args := new(SetSolcArgs)
if err := self.coder.Decode(req.Params, &args); err != nil { if err := self.coder.Decode(req.Params, &args); err != nil {
......
...@@ -143,10 +143,6 @@ web3._extend({ ...@@ -143,10 +143,6 @@ web3._extend({
new web3._extend.Property({ new web3._extend.Property({
name: 'datadir', name: 'datadir',
getter: 'admin_datadir' getter: 'admin_datadir'
}),
new web3._extend.Property({
name: 'chainSyncStatus',
getter: 'admin_chainSyncStatus'
}) })
] ]
}); });
......
...@@ -55,6 +55,7 @@ var ( ...@@ -55,6 +55,7 @@ var (
"eth_protocolVersion": (*ethApi).ProtocolVersion, "eth_protocolVersion": (*ethApi).ProtocolVersion,
"eth_coinbase": (*ethApi).Coinbase, "eth_coinbase": (*ethApi).Coinbase,
"eth_mining": (*ethApi).IsMining, "eth_mining": (*ethApi).IsMining,
"eth_syncing": (*ethApi).IsSyncing,
"eth_gasPrice": (*ethApi).GasPrice, "eth_gasPrice": (*ethApi).GasPrice,
"eth_getStorage": (*ethApi).GetStorage, "eth_getStorage": (*ethApi).GetStorage,
"eth_storageAt": (*ethApi).GetStorage, "eth_storageAt": (*ethApi).GetStorage,
...@@ -166,6 +167,20 @@ func (self *ethApi) IsMining(req *shared.Request) (interface{}, error) { ...@@ -166,6 +167,20 @@ func (self *ethApi) IsMining(req *shared.Request) (interface{}, error) {
return self.xeth.IsMining(), nil return self.xeth.IsMining(), nil
} }
func (self *ethApi) IsSyncing(req *shared.Request) (interface{}, error) {
current := self.ethereum.ChainManager().CurrentBlock().NumberU64()
origin, height := self.ethereum.Downloader().Boundaries()
if current < height {
return map[string]interface{}{
"startingBlock": newHexNum(big.NewInt(int64(origin)).Bytes()),
"currentBlock": newHexNum(big.NewInt(int64(current)).Bytes()),
"highestBlock": newHexNum(big.NewInt(int64(height)).Bytes()),
}, nil
}
return false, nil
}
func (self *ethApi) GasPrice(req *shared.Request) (interface{}, error) { func (self *ethApi) GasPrice(req *shared.Request) (interface{}, error) {
return newHexNum(self.xeth.DefaultGasPrice().Bytes()), nil return newHexNum(self.xeth.DefaultGasPrice().Bytes()), nil
} }
......
...@@ -32,7 +32,6 @@ var ( ...@@ -32,7 +32,6 @@ var (
AutoCompletion = map[string][]string{ AutoCompletion = map[string][]string{
"admin": []string{ "admin": []string{
"addPeer", "addPeer",
"chainSyncStatus",
"datadir", "datadir",
"exportChain", "exportChain",
"getContractInfo", "getContractInfo",
...@@ -99,6 +98,7 @@ var ( ...@@ -99,6 +98,7 @@ var (
"sendRawTransaction", "sendRawTransaction",
"sendTransaction", "sendTransaction",
"sign", "sign",
"syncing",
}, },
"miner": []string{ "miner": []string{
"hashrate", "hashrate",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment