Commit 016f152b authored by obscuren's avatar obscuren

eth, eth/downloader: Moved block processing & graceful shutdown

The downloader is no longer responsible for processing blocks. The
eth-protocol handler now takes care of this instead.

Added graceful shutdown during block processing. Closes #846
parent 8595198c
......@@ -219,7 +219,7 @@ func New(config *Config) (*Ethereum, error) {
}
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain)
eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.pow = ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
......@@ -454,8 +454,8 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
func (s *Ethereum) Stop() {
s.txSub.Unsubscribe() // quits txBroadcastLoop
s.chainManager.Stop()
s.protocolManager.Stop()
s.chainManager.Stop()
s.txPool.Stop()
s.eventMux.Stop()
if s.whisper != nil {
......
This diff is collapsed.
......@@ -8,8 +8,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
var knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
......@@ -28,7 +26,7 @@ func createHashes(start, amount int) (hashes []common.Hash) {
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
blocks := make(map[common.Hash]*types.Block)
for i, hash := range hashes {
header := &types.Header{Number: big.NewInt(int64(i))}
header := &types.Header{Number: big.NewInt(int64(len(hashes) - i))}
blocks[hash] = types.NewBlockWithHeader(header)
blocks[hash].HeaderHash = hash
}
......@@ -43,13 +41,11 @@ type downloadTester struct {
t *testing.T
pcount int
done chan bool
insertedBlocks int
}
func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
tester := &downloadTester{t: t, hashes: hashes, blocks: blocks, done: make(chan bool)}
downloader := New(tester.hasBlock, tester.insertChain)
downloader := New(tester.hasBlock, tester.getBlock)
tester.downloader = downloader
return tester
......@@ -62,10 +58,8 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool {
return false
}
func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
dl.insertedBlocks += len(blocks)
return 0, nil
func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
return dl.blocks[knownHash]
}
func (dl *downloadTester) getHashes(hash common.Hash) error {
......@@ -102,9 +96,6 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash
}
func TestDownload(t *testing.T) {
glog.SetV(logger.Detail)
glog.SetToStderr(true)
minDesiredPeerCount = 4
blockTtl = 1 * time.Second
......@@ -123,15 +114,13 @@ func TestDownload(t *testing.T) {
t.Error("download error", err)
}
if tester.insertedBlocks != targetBlocks {
t.Error("expected", targetBlocks, "have", tester.insertedBlocks)
inqueue := len(tester.downloader.queue.blocks)
if inqueue != targetBlocks {
t.Error("expected", targetBlocks, "have", inqueue)
}
}
func TestMissing(t *testing.T) {
glog.SetV(logger.Detail)
glog.SetToStderr(true)
targetBlocks := 1000
hashes := createHashes(0, 1000)
extraHashes := createHashes(1001, 1003)
......@@ -148,7 +137,33 @@ func TestMissing(t *testing.T) {
t.Error("download error", err)
}
if tester.insertedBlocks != targetBlocks {
t.Error("expected", targetBlocks, "have", tester.insertedBlocks)
inqueue := len(tester.downloader.queue.blocks)
if inqueue != targetBlocks {
t.Error("expected", targetBlocks, "have", inqueue)
}
}
func TestTaking(t *testing.T) {
minDesiredPeerCount = 4
blockTtl = 1 * time.Second
targetBlocks := 1000
hashes := createHashes(0, targetBlocks)
blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks)
tester.newPeer("peer1", big.NewInt(10000), hashes[0])
tester.newPeer("peer2", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
err := tester.downloader.Synchronise("peer1", hashes[0])
if err != nil {
t.Error("download error", err)
}
bs1 := tester.downloader.TakeBlocks(1000)
if len(bs1) != 1000 {
t.Error("expected to take 1000, got", len(bs1))
}
}
......@@ -18,7 +18,9 @@ type queue struct {
mu sync.Mutex
fetching map[string]*chunk
blocks []*types.Block
blockOffset int
blocks []*types.Block
}
func newqueue() *queue {
......@@ -34,6 +36,10 @@ func (c *queue) reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.resetNoTS()
}
func (c *queue) resetNoTS() {
c.blockOffset = 0
c.hashPool.Clear()
c.fetchPool.Clear()
c.blockHashes.Clear()
......@@ -41,6 +47,10 @@ func (c *queue) reset() {
c.fetching = make(map[string]*chunk)
}
func (c *queue) size() int {
return c.hashPool.Size() + c.blockHashes.Size() + c.fetchPool.Size()
}
// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
c.mu.Lock()
......@@ -89,7 +99,7 @@ func (c *queue) get(p *peer, max int) *chunk {
}
func (c *queue) has(hash common.Hash) bool {
return c.hashPool.Has(hash) || c.fetchPool.Has(hash)
return c.hashPool.Has(hash) || c.fetchPool.Has(hash) || c.blockHashes.Has(hash)
}
func (c *queue) addBlock(id string, block *types.Block) {
......@@ -103,8 +113,24 @@ func (c *queue) addBlock(id string, block *types.Block) {
}
}
func (c *queue) getBlock(hash common.Hash) *types.Block {
c.mu.Lock()
defer c.mu.Unlock()
if !c.blockHashes.Has(hash) {
return nil
}
for _, block := range c.blocks {
if block.Hash() == hash {
return block
}
}
return nil
}
// deliver delivers a chunk to the queue that was requested of the peer
func (c *queue) deliver(id string, blocks []*types.Block) {
func (c *queue) deliver(id string, blocks []*types.Block) error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -124,11 +150,35 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
// merge block hashes
c.blockHashes.Merge(blockHashes)
// Add the blocks
c.blocks = append(c.blocks, blocks...)
for _, block := range blocks {
// See (1) for future limitation
n := int(block.NumberU64()) - c.blockOffset
if n > len(c.blocks) || n < 0 {
return errBlockNumberOverflow
}
c.blocks[n] = block
}
// Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes)
c.fetchPool.Separate(chunk.hashes)
}
return nil
}
func (c *queue) alloc(offset, size int) {
c.mu.Lock()
defer c.mu.Unlock()
if c.blockOffset < offset {
c.blockOffset = offset
}
// (1) XXX at some point we could limit allocation to memory and use the disk
// to store future blocks.
if len(c.blocks) < size {
c.blocks = append(c.blocks, make([]*types.Block, size)...)
}
}
// puts puts sets of hashes on to the queue for fetching
......
package eth
// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change
// The idea is that most of the calls within the protocol will become synchronous.
// Block downloading and block processing will be complete seperate processes
/*
# Possible scenarios
// Synching scenario
// Use the best peer to synchronise
blocks, err := pm.downloader.Synchronise()
if err != nil {
// handle
break
}
pm.chainman.InsertChain(blocks)
// Receiving block with known parent
if parent_exist {
if err := pm.chainman.InsertChain(block); err != nil {
// handle
break
}
pm.BroadcastBlock(block)
}
// Receiving block with unknown parent
blocks, err := pm.downloader.SynchroniseWithPeer(peer)
if err != nil {
// handle
break
}
pm.chainman.InsertChain(blocks)
*/
import (
"fmt"
"math"
......@@ -54,7 +20,9 @@ import (
const (
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcTimer = 500 * time.Millisecond
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256
)
func errResp(code errCode, format string, v ...interface{}) error {
......@@ -91,6 +59,10 @@ type ProtocolManager struct {
newPeerCh chan *peer
quitSync chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
quit bool
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
......@@ -129,65 +101,6 @@ func (pm *ProtocolManager) removePeer(peer *peer) {
delete(pm.peers, peer.id)
}
func (pm *ProtocolManager) syncHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
itimer := time.NewTimer(peerCountTimeout)
out:
for {
select {
case <-pm.newPeerCh:
// Meet the `minDesiredPeerCount` before we select our best peer
if len(pm.peers) < minDesiredPeerCount {
break
}
// Find the best peer
peer := getBestPeer(pm.peers)
if peer == nil {
glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
}
itimer.Stop()
go pm.synchronise(peer)
case <-itimer.C:
// The timer will make sure that the downloader keeps an active state
// in which it attempts to always check the network for highest td peers
// Either select the peer or restart the timer if no peers could
// be selected.
if peer := getBestPeer(pm.peers); peer != nil {
go pm.synchronise(peer)
} else {
itimer.Reset(5 * time.Second)
}
case <-pm.quitSync:
break out
}
}
}
func (pm *ProtocolManager) synchronise(peer *peer) {
// Make sure the peer's TD is higher than our own. If not drop.
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
return
}
// Check downloader if it's busy so it doesn't show the sync message
// for every attempty
if pm.downloader.IsBusy() {
return
}
glog.V(logger.Info).Infof("Synchronisation attempt using %s TD=%v\n", peer.id, peer.td)
// Get the hashes from the peer (synchronously)
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
if err != nil && err == downloader.ErrBadPeer {
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
pm.removePeer(peer)
} else if err != nil {
// handle error
glog.V(logger.Debug).Infoln("error downloading:", err)
}
}
func (pm *ProtocolManager) Start() {
// broadcast transactions
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
......@@ -197,18 +110,26 @@ func (pm *ProtocolManager) Start() {
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// sync handler
go pm.syncHandler()
go pm.update()
}
func (pm *ProtocolManager) Stop() {
// Showing a log message. During download / process this could actually
// take between 5 to 10 seconds and therefor feedback is required.
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
pm.quit = true
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
close(pm.quitSync) // quits the sync handler
// Wait for any process action
pm.wg.Wait()
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
td, current, genesis := pm.chainman.Status()
return newPeer(pv, nv, genesis, current, td, p, rw)
......@@ -359,6 +280,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Add the block hash as a known hash to the peer. This will later be used to determine
// who should receive this.
p.blockHashes.Add(hash)
// update the peer info
p.recentHash = hash
p.td = request.TD
_, chainHead, _ := self.chainman.Status()
......@@ -383,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Attempt to insert the newly received by checking if the parent exists.
// if the parent exists we process the block and propagate to our peers
// if the parent does not exists we delegate to the downloader.
// otherwise synchronise with the peer
if self.chainman.HasBlock(request.Block.ParentHash()) {
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
......@@ -400,24 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
self.BroadcastBlock(hash, request.Block)
} else {
// adding blocks is synchronous
go func() {
err := self.downloader.AddBlock(p.id, request.Block, request.TD)
if err != nil && err == downloader.ErrBadPeer {
glog.V(logger.Error).Infoln("removed peer (", p.id, ") with err:", err)
self.removePeer(p)
return
} else if err != nil {
glog.V(logger.Detail).Infoln("downloader err:", err)
return
}
if err := self.verifyTd(p, request); err != nil {
glog.V(logger.Error).Infoln(err)
return
}
self.BroadcastBlock(hash, request.Block)
}()
go self.synchronise(p)
}
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
......
package eth
import (
"math"
"time"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
// Sync contains all synchronisation code for the eth protocol
func (pm *ProtocolManager) update() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
itimer := time.NewTimer(peerCountTimeout)
// btimer is used for picking of blocks from the downloader
btimer := time.NewTicker(blockProcTimer)
out:
for {
select {
case <-pm.newPeerCh:
// Meet the `minDesiredPeerCount` before we select our best peer
if len(pm.peers) < minDesiredPeerCount {
break
}
// Find the best peer
peer := getBestPeer(pm.peers)
if peer == nil {
glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
}
itimer.Stop()
go pm.synchronise(peer)
case <-itimer.C:
// The timer will make sure that the downloader keeps an active state
// in which it attempts to always check the network for highest td peers
// Either select the peer or restart the timer if no peers could
// be selected.
if peer := getBestPeer(pm.peers); peer != nil {
go pm.synchronise(peer)
} else {
itimer.Reset(5 * time.Second)
}
case <-btimer.C:
go pm.processBlocks()
case <-pm.quitSync:
break out
}
}
}
// processBlocks will attempt to reconstruct a chain by checking the first item and check if it's
// a known parent. The first block in the chain may be unknown during downloading. When the
// downloader isn't downloading blocks will be dropped with an unknown parent until either it
// has depleted the list or found a known parent.
func (pm *ProtocolManager) processBlocks() error {
pm.wg.Add(1)
defer pm.wg.Done()
blocks := pm.downloader.TakeBlocks()
if len(blocks) == 0 {
return nil
}
defer pm.downloader.Done()
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
for len(blocks) != 0 && !pm.quit {
max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
_, err := pm.chainman.InsertChain(blocks[:max])
if err != nil {
return err
}
blocks = blocks[max:]
}
return nil
}
func (pm *ProtocolManager) synchronise(peer *peer) {
// Make sure the peer's TD is higher than our own. If not drop.
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
return
}
// Check downloader if it's busy so it doesn't show the sync message
// for every attempty
if pm.downloader.IsBusy() {
return
}
// Get the hashes from the peer (synchronously)
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
if err != nil && err == downloader.ErrBadPeer {
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
pm.removePeer(peer)
} else if err != nil {
// handle error
glog.V(logger.Detail).Infoln("error downloading:", err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment