Commit 76070b46 authored by zelig's avatar zelig

blockpool rewritten , tests broken FIXME

parent 3308d82b
package eth package eth
import ( import (
"bytes"
"fmt"
"math" "math"
"math/big" "math/big"
"math/rand"
"sort"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/event"
ethlogger "github.com/ethereum/go-ethereum/logger" ethlogger "github.com/ethereum/go-ethereum/logger"
) )
...@@ -19,76 +18,88 @@ var poolLogger = ethlogger.NewLogger("Blockpool") ...@@ -19,76 +18,88 @@ var poolLogger = ethlogger.NewLogger("Blockpool")
const ( const (
blockHashesBatchSize = 256 blockHashesBatchSize = 256
blockBatchSize = 64 blockBatchSize = 64
blockRequestInterval = 10 // seconds blocksRequestInterval = 10 // seconds
blockRequestRepetition = 1 blocksRequestRepetition = 1
blockHashesRequestInterval = 10 // seconds
blocksRequestMaxIdleRounds = 10
cacheTimeout = 3 // minutes cacheTimeout = 3 // minutes
blockTimeout = 5 // minutes blockTimeout = 5 // minutes
) )
type poolNode struct { type poolNode struct {
lock sync.RWMutex
hash []byte hash []byte
block *types.Block block *types.Block
child *poolNode child *poolNode
parent *poolNode parent *poolNode
root *nodePointer section *section
knownParent bool knownParent bool
suicide chan bool
peer string peer string
source string source string
blockRequestRoot bool complete bool
blockRequestControl *bool
blockRequestQuit *(chan bool)
}
// the minimal interface for chain manager
type chainManager interface {
KnownBlock(hash []byte) bool
AddBlock(*types.Block) error
CheckPoW(*types.Block) bool
} }
type BlockPool struct { type BlockPool struct {
chainManager chainManager lock sync.RWMutex
eventer event.TypeMux
// pool Pool
lock sync.Mutex
pool map[string]*poolNode pool map[string]*poolNode
peersLock sync.Mutex peersLock sync.RWMutex
peers map[string]*peerInfo peers map[string]*peerInfo
peer *peerInfo peer *peerInfo
quit chan bool quit chan bool
wg sync.WaitGroup wg sync.WaitGroup
running bool running bool
// the minimal interface with blockchain
hasBlock func(hash []byte) bool
insertChain func(types.Blocks) error
verifyPoW func(*types.Block) bool
} }
type peerInfo struct { type peerInfo struct {
lock sync.RWMutex
td *big.Int td *big.Int
currentBlock []byte currentBlock []byte
id string id string
requestBlockHashes func([]byte) error requestBlockHashes func([]byte) error
requestBlocks func([][]byte) error requestBlocks func([][]byte) error
invalidBlock func(error) peerError func(int, string, ...interface{})
}
type nodePointer struct { sections map[string]*section
hash []byte roots []*poolNode
quitC chan bool
} }
type peerChangeEvent struct { func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(*types.Block) bool,
*peerInfo ) *BlockPool {
return &BlockPool{
hasBlock: hasBlock,
insertChain: insertChain,
verifyPoW: verifyPoW,
}
} }
func NewBlockPool(chMgr chainManager) *BlockPool { // allows restart
return &BlockPool{ func (self *BlockPool) Start() {
chainManager: chMgr, self.lock.Lock()
pool: make(map[string]*poolNode), if self.running {
peers: make(map[string]*peerInfo), self.lock.Unlock()
quit: make(chan bool), return
running: true,
} }
self.running = true
self.quit = make(chan bool)
self.pool = make(map[string]*poolNode)
self.lock.Unlock()
self.peersLock.Lock()
self.peers = make(map[string]*peerInfo)
self.peersLock.Unlock()
poolLogger.Infoln("Started")
} }
func (self *BlockPool) Stop() { func (self *BlockPool) Stop() {
...@@ -103,308 +114,566 @@ func (self *BlockPool) Stop() { ...@@ -103,308 +114,566 @@ func (self *BlockPool) Stop() {
poolLogger.Infoln("Stopping") poolLogger.Infoln("Stopping")
close(self.quit) close(self.quit)
self.lock.Lock()
self.peersLock.Lock()
self.peers = nil
self.pool = nil
self.peer = nil
self.wg.Wait() self.wg.Wait()
self.lock.Unlock()
self.peersLock.Unlock()
poolLogger.Infoln("Stopped") poolLogger.Infoln("Stopped")
} }
// AddPeer is called by the eth protocol instance running on the peer after
// the status message has been received with total difficulty and current block hash
// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects
func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) bool {
self.peersLock.Lock()
defer self.peersLock.Unlock()
if self.peers[peerId] != nil {
panic("peer already added")
}
peer := &peerInfo{
td: td,
currentBlock: currentBlock,
id: peerId, //peer.Identity().Pubkey()
requestBlockHashes: requestBlockHashes,
requestBlocks: requestBlocks,
peerError: peerError,
}
self.peers[peerId] = peer
poolLogger.Debugf("add new peer %v with td %v", peerId, td)
currentTD := ethutil.Big0
if self.peer != nil {
currentTD = self.peer.td
}
if td.Cmp(currentTD) > 0 {
self.peer.stop(peer)
peer.start(self.peer)
poolLogger.Debugf("peer %v promoted to best peer", peerId)
self.peer = peer
return true
}
return false
}
// RemovePeer is called by the eth protocol when the peer disconnects
func (self *BlockPool) RemovePeer(peerId string) {
self.peersLock.Lock()
defer self.peersLock.Unlock()
peer := self.peers[peerId]
if peer == nil {
return
}
self.peers[peerId] = nil
poolLogger.Debugf("remove peer %v", peerId[0:4])
// if current best peer is removed, need find a better one
if self.peer != nil && peerId == self.peer.id {
var newPeer *peerInfo
max := ethutil.Big0
// peer with the highest self-acclaimed TD is chosen
for _, info := range self.peers {
if info.td.Cmp(max) > 0 {
max = info.td
newPeer = info
}
}
self.peer.stop(peer)
peer.start(self.peer)
if newPeer != nil {
poolLogger.Debugf("peer %v with td %v promoted to best peer", newPeer.id[0:4], newPeer.td)
} else {
poolLogger.Warnln("no peers left")
}
}
}
// Entry point for eth protocol to add block hashes received via BlockHashesMsg // Entry point for eth protocol to add block hashes received via BlockHashesMsg
// only hashes from the best peer is handled // only hashes from the best peer is handled
// this method is always responsible to initiate further hash requests until // this method is always responsible to initiate further hash requests until
// a known parent is reached unless cancelled by a peerChange event // a known parent is reached unless cancelled by a peerChange event
// this process also launches all request processes on each chain section
// this function needs to run asynchronously for one peer since the message is discarded???
func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) { func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
// subscribe to peerChangeEvent before we check for best peer
peerChange := self.eventer.Subscribe(peerChangeEvent{})
defer peerChange.Unsubscribe()
// check if this peer is the best // check if this peer is the best
peer, best := self.getPeer(peerId) peer, best := self.getPeer(peerId)
if !best { if !best {
return return
} }
root := &nodePointer{}
// peer is still the best // peer is still the best
hashes := make(chan []byte)
var lastPoolNode *poolNode
// using a for select loop so that peer change (new best peer) can abort the parallel thread that processes hashes of the earlier best peer var child *poolNode
var depth int
// iterate using next (rlp stream lazy decoder) feeding hashesC
self.wg.Add(1)
go func() {
for { for {
hash, ok := next()
if ok {
hashes <- hash
} else {
break
}
select { select {
case <-self.quit: case <-self.quit:
return return
case <-peerChange.Chan(): case <-peer.quitC:
// remember where we left off with this peer // if the peer is demoted, no more hashes taken
if lastPoolNode != nil { break
root.hash = lastPoolNode.hash default:
go self.killChain(lastPoolNode) hash, ok := next()
if !ok {
// message consumed chain skeleton built
break
} }
case hash := <-hashes:
self.lock.Lock()
defer self.lock.Unlock()
// check if known block connecting the downloaded chain to our blockchain // check if known block connecting the downloaded chain to our blockchain
if self.chainManager.KnownBlock(hash) { if self.hasBlock(hash) {
poolLogger.Infof("known block (%x...)\n", hash[0:4]) poolLogger.Infof("known block (%x...)\n", hash[0:4])
if lastPoolNode != nil { if child != nil {
lastPoolNode.knownParent = true child.Lock()
go self.requestBlocksLoop(lastPoolNode) // mark child as absolute pool root with parent known to blockchain
} else { child.knownParent = true
// all hashes known if topmost one is in blockchain child.Unlock()
} }
return break
} }
// //
var currentPoolNode *poolNode var parent *poolNode
// check if lastPoolNode has the correct parent node (hash matching), // look up node in pool
// then just assign to currentPoolNode parent = self.get(hash)
if lastPoolNode != nil && lastPoolNode.parent != nil && bytes.Compare(lastPoolNode.parent.hash, hash) == 0 { if parent != nil {
currentPoolNode = lastPoolNode.parent // reached a known chain in the pool
} else { // request blocks on the newly added part of the chain
// otherwise look up in pool if child != nil {
currentPoolNode = self.pool[string(hash)] self.link(parent, child)
// activate the current chain
self.activateChain(parent, peer, true)
poolLogger.Debugf("potential chain of %v blocks added, reached blockpool, activate chain", depth)
break
}
// if this is the first hash, we expect to find it
parent.RLock()
grandParent := parent.parent
parent.RUnlock()
if grandParent != nil {
// activate the current chain
self.activateChain(parent, peer, true)
poolLogger.Debugf("block hash found, activate chain")
break
}
// the first node is the root of a chain in the pool, rejoice and continue
}
// if node does not exist, create it and index in the pool // if node does not exist, create it and index in the pool
if currentPoolNode == nil { section := &section{}
currentPoolNode = &poolNode{ if child == nil {
section.top = parent
}
parent = &poolNode{
hash: hash, hash: hash,
child: child,
section: section,
peer: peerId,
} }
self.pool[string(hash)] = currentPoolNode self.set(hash, parent)
poolLogger.Debugf("create potential block for %x...", hash[0:4])
depth++
child = parent
} }
} }
// set up parent-child nodes (doubly linked list) if child != nil {
self.link(currentPoolNode, lastPoolNode) poolLogger.Debugf("chain of %v hashes added", depth)
// ! we trust the node iff // start a processSection on the last node, but switch off asking
// (1) node marked as by the same peer or // hashes and blocks until next peer confirms this chain
// (2) it has a PoW valid block retrieved section := self.processSection(child)
if currentPoolNode.peer == peer.id || currentPoolNode.block != nil { peer.addSection(child.hash, section)
// the trusted checkpoint from which we request hashes down to known head section.start()
lastPoolNode = self.pool[string(currentPoolNode.root.hash)]
break
} }
currentPoolNode.peer = peer.id }()
currentPoolNode.root = root }
lastPoolNode = currentPoolNode
// AddBlock is the entry point for the eth protocol when blockmsg is received upon requests
// It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error (which can be ignored)
// block is checked for PoW
// only the first PoW-valid block for a hash is considered legit
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash()
node := self.get(hash)
node.RLock()
b := node.block
node.RUnlock()
if b != nil {
return
} }
if node == nil && !self.hasBlock(hash) {
self.peerError(peerId, ErrUnrequestedBlock, "%x", hash)
return
} }
// lastPoolNode is nil if and only if the node with stored root hash is already cleaned up // validate block for PoW
// after valid block insertion, therefore in this case the blockpool active chain is connected to the blockchain, so no need to request further hashes or request blocks if !self.verifyPoW(block) {
if lastPoolNode != nil { self.peerError(peerId, ErrInvalidPoW, "%x", hash)
root.hash = lastPoolNode.hash
peer.requestBlockHashes(lastPoolNode.hash)
go self.requestBlocksLoop(lastPoolNode)
} }
return node.Lock()
node.block = block
node.source = peerId
node.Unlock()
} }
func (self *BlockPool) requestBlocksLoop(node *poolNode) { // iterates down a known poolchain and activates fetching processes
suicide := time.After(blockTimeout * time.Minute) // on each chain section for the peer
requestTimer := time.After(0) // stops if the peer is demoted
var controlChan chan bool // registers last section root as root for the peer (in case peer is promoted a second time, to remember)
closedChan := make(chan bool) func (self *BlockPool) activateChain(node *poolNode, peer *peerInfo, on bool) {
quit := make(chan bool)
close(closedChan)
requestBlocks := true
origNode := node
self.lock.Lock()
node.blockRequestRoot = true
b := false
control := &b
node.blockRequestControl = control
node.blockRequestQuit = &quit
self.lock.Unlock()
blocks := 0
self.wg.Add(1) self.wg.Add(1)
loop: go func() {
for { for {
if requestBlocks { node.sectionRLock()
controlChan = closedChan bottom := node.section.bottom
if bottom == nil { // the chain section is being created or killed
break
}
// register this section with the peer
if peer != nil {
peer.addSection(bottom.hash, bottom.section)
if on {
bottom.section.start()
} else { } else {
self.lock.Lock() bottom.section.start()
if *node.blockRequestControl {
controlChan = closedChan
*node.blockRequestControl = false
} }
self.lock.Unlock()
} }
if bottom.parent == nil {
node = bottom
break
}
// if peer demoted stop activation
select { select {
case <-quit: case <-peer.quitC:
break loop break
case <-suicide: default:
go self.killChain(origNode) }
break loop
node = bottom.parent
case <-requestTimer: bottom.sectionRUnlock()
requestBlocks = true }
// remember root for this peer
case <-controlChan: peer.addRoot(node)
controlChan = nil self.wg.Done()
// this iteration takes care of requesting blocks only starting from the first node with a missing block (moving target), }()
// max up to the next checkpoint (n.blockRequestRoot true) }
nodes := []*poolNode{}
n := node // main worker thread on each section in the poolchain
next := node // - kills the section if there are blocks missing after an absolute time
self.lock.Lock() // - kills the section if there are maxIdleRounds of idle rounds of block requests with no response
for n != nil && (n == node || !n.blockRequestRoot) && (requestBlocks || n.block != nil) { // - periodically polls the chain section for missing blocks which are then requested from peers
if n.block != nil { // - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they expire and killed ()
if len(nodes) == 0 { // - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
// nil control indicates that node is not needed anymore // - when turned back on it recursively calls itself on the root of the next chain section
// block can be inserted to blockchain and deleted if knownParent // - when exits, signals to
n.blockRequestControl = nil func (self *BlockPool) processSection(node *poolNode) *section {
blocks++ // absolute time after which sub-chain is killed if not complete (some blocks are missing)
next = next.child suicideTimer := time.After(blockTimeout * time.Minute)
var blocksRequestTimer, blockHashesRequestTimer <-chan time.Time
var nodeC, missingC, processC chan *poolNode
controlC := make(chan bool)
resetC := make(chan bool)
var hashes [][]byte
var i, total, missing, lastMissing, depth int
var blockHashesRequests, blocksRequests int
var idle int
var init, alarm, done, same, running, once bool
orignode := node
hash := node.hash
node.sectionLock()
defer node.sectionUnlock()
section := &section{controlC: controlC, resetC: resetC}
node.section = section
go func() {
self.wg.Add(1)
for {
node.sectionRLock()
controlC = node.section.controlC
node.sectionRUnlock()
if init {
// missing blocks read from nodeC
// initialized section
if depth == 0 {
break
}
// enable select case to read missing block when ready
processC = missingC
missingC = make(chan *poolNode, lastMissing)
nodeC = nil
// only do once
init = false
} else { } else {
// this is needed to indicate that when a new chain forks from an existing one if !once {
// triggering a reorg will ? renew the blockTimeout period ??? missingC = nil
// if there is a block but control == nil should start fetching blocks, see link function processC = nil
n.blockRequestControl = control i = 0
total = 0
lastMissing = 0
}
}
// went through all blocks in section
if i != 0 && i == lastMissing {
if len(hashes) > 0 {
// send block requests to peers
self.requestBlocks(blocksRequests, hashes)
}
blocksRequests++
poolLogger.Debugf("[%x] block request attempt %v: missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
if missing == lastMissing {
// idle round
if same {
// more than once
idle++
// too many idle rounds
if idle > blocksRequestMaxIdleRounds {
poolLogger.Debugf("[%x] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", hash[0:4], idle, blocksRequests, missing, total, depth)
self.killChain(node, nil)
break
} }
} else { } else {
nodes = append(nodes, n) idle = 0
n.blockRequestControl = control
} }
n = n.child same = true
} else {
if missing == 0 {
// no missing nodes
poolLogger.Debugf("block request process complete on section %x... (%v total blocksRequests): missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
node.Lock()
orignode.complete = true
node.Unlock()
blocksRequestTimer = nil
if blockHashesRequestTimer == nil {
// not waiting for hashes any more
poolLogger.Debugf("hash request on root %x... successful (%v total attempts)\nquitting...", hash[0:4], blockHashesRequests)
break
} // otherwise suicide if no hashes coming
}
same = false
}
lastMissing = missing
i = 0
missing = 0
// ready for next round
done = true
}
if done && alarm {
poolLogger.Debugf("start checking if new blocks arrived (attempt %v): missing %v/%v/%v", blocksRequests, missing, total, depth)
blocksRequestTimer = time.After(blocksRequestInterval * time.Second)
alarm = false
done = false
// processC supposed to be empty and never closed so just swap, no need to allocate
tempC := processC
processC = missingC
missingC = tempC
} }
// if node is connected to the blockchain, we can immediately start inserting select {
// blocks to the blockchain and delete nodes case <-self.quit:
if node.knownParent { break
go self.insertChainFrom(node) case <-suicideTimer:
self.killChain(node, nil)
poolLogger.Warnf("[%x] timeout. (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
break
case <-blocksRequestTimer:
alarm = true
case <-blockHashesRequestTimer:
orignode.RLock()
parent := orignode.parent
orignode.RUnlock()
if parent != nil {
// if not root of chain, switch off
poolLogger.Debugf("[%x] parent found, hash requests deactivated (after %v total attempts)\n", hash[0:4], blockHashesRequests)
blockHashesRequestTimer = nil
} else {
blockHashesRequests++
poolLogger.Debugf("[%x] hash request on root (%v total attempts)\n", hash[0:4], blockHashesRequests)
self.requestBlockHashes(parent.hash)
blockHashesRequestTimer = time.After(blockHashesRequestInterval * time.Second)
} }
if next.blockRequestRoot && next != node { case r, ok := <-controlC:
// no more missing blocks till the checkpoint, quitting if !ok {
poolLogger.Debugf("fetched %v blocks on active chain, batch %v-%v", blocks, origNode, n) break
break loop
} }
self.lock.Unlock() if running && !r {
poolLogger.Debugf("process on section %x... (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
// reset starting node to the first descendant node with missing block
node = next alarm = false
if !requestBlocks { blocksRequestTimer = nil
blockHashesRequestTimer = nil
processC = nil
}
if !running && r {
poolLogger.Debugf("[%x] on", hash[0:4])
orignode.RLock()
parent := orignode.parent
complete := orignode.complete
knownParent := orignode.knownParent
orignode.RUnlock()
if !complete {
poolLogger.Debugf("[%x] activate block requests", hash[0:4])
blocksRequestTimer = time.After(0)
}
if parent == nil && !knownParent {
// if no parent but not connected to blockchain
poolLogger.Debugf("[%x] activate block hashes requests", hash[0:4])
blockHashesRequestTimer = time.After(0)
} else {
blockHashesRequestTimer = nil
}
alarm = true
processC = missingC
if !once {
// if not run at least once fully, launch iterator
processC = make(chan *poolNode)
missingC = make(chan *poolNode)
self.foldUp(orignode, processC)
once = true
}
}
total = lastMissing
case <-resetC:
once = false
init = false
done = false
case node, ok := <-processC:
if !ok {
// channel closed, first iteration finished
init = true
once = true
continue continue
} }
go self.requestBlocks(nodes) i++
requestTimer = time.After(blockRequestInterval * time.Second) // if node has no block
node.RLock()
block := node.block
nhash := node.hash
knownParent := node.knownParent
node.RUnlock()
if !init {
depth++
}
if block == nil {
missing++
if !init {
total++
}
hashes = append(hashes, nhash)
if len(hashes) == blockBatchSize {
self.requestBlocks(blocksRequests, hashes)
hashes = nil
}
missingC <- node
} else {
// block is found
if knownParent {
// connected to the blockchain, insert the longest chain of blocks
var blocks types.Blocks
child := node
parent := node
node.sectionRLock()
for child != nil && child.block != nil {
parent = child
blocks = append(blocks, parent.block)
child = parent.child
}
node.sectionRUnlock()
poolLogger.Debugf("[%x] insert %v blocks into blockchain", hash[0:4], len(blocks))
if err := self.insertChain(blocks); err != nil {
// TODO: not clear which peer we need to address
// peerError should dispatch to peer if still connected and disconnect
self.peerError(node.source, ErrInvalidBlock, "%v", err)
poolLogger.Debugf("invalid block %v", node.hash)
poolLogger.Debugf("penalise peers %v (hash), %v (block)", node.peer, node.source)
// penalise peer in node.source
self.killChain(node, nil)
// self.disconnect()
break
} }
// if suceeded mark the next one (no block yet) as connected to blockchain
if child != nil {
child.Lock()
child.knownParent = true
child.Unlock()
} }
self.wg.Done() // reset starting node to first node with missing block
return orignode = child
} // pop the inserted ancestors off the channel
for i := 1; i < len(blocks); i++ {
func (self *BlockPool) requestBlocks(nodes []*poolNode) { <-processC
// distribute block request among known peers
self.peersLock.Lock()
peerCount := len(self.peers)
poolLogger.Debugf("requesting %v missing blocks from %v peers", len(nodes), peerCount)
blockHashes := make([][][]byte, peerCount)
repetitions := int(math.Max(float64(peerCount)/2.0, float64(blockRequestRepetition)))
for n, node := range nodes {
for i := 0; i < repetitions; i++ {
blockHashes[n%peerCount] = append(blockHashes[n%peerCount], node.hash)
n++
} }
// delink inserted chain section
self.killChain(node, parent)
} }
i := 0
for _, peer := range self.peers {
peer.requestBlocks(blockHashes[i])
i++
} }
self.peersLock.Unlock()
}
func (self *BlockPool) insertChainFrom(node *poolNode) {
self.lock.Lock()
defer self.lock.Unlock()
for node != nil && node.blockRequestControl == nil {
err := self.chainManager.AddBlock(node.block)
if err != nil {
poolLogger.Debugf("invalid block %v", node.hash)
poolLogger.Debugf("penalise peers %v (hash), %v (block)", node.peer, node.source)
// penalise peer in node.source
go self.killChain(node)
return
} }
poolLogger.Debugf("insert block %v into blockchain", node.hash)
node = node.child
} }
// if block insertion succeeds, mark the child as knownParent poolLogger.Debugf("[%x] quit after\n%v block hashes requests\n%v block requests: missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
// trigger request blocks reorg
if node != nil { self.wg.Done()
node.knownParent = true node.sectionLock()
*(node.blockRequestControl) = true node.section.controlC = nil
node.sectionUnlock()
// this signals that controller not available
}()
return section
}
func (self *BlockPool) peerError(peerId string, code int, format string, params ...interface{}) {
self.peersLock.RLock()
defer self.peersLock.RUnlock()
peer, ok := self.peers[peerId]
if ok {
peer.peerError(code, format, params...)
} }
} }
// AddPeer is called by the eth protocol instance running on the peer after func (self *BlockPool) requestBlockHashes(hash []byte) {
// the status message has been received with total difficulty and current block hash
// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects
func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) bool {
self.peersLock.Lock() self.peersLock.Lock()
defer self.peersLock.Unlock() defer self.peersLock.Unlock()
if self.peers[peerId] != nil {
panic("peer already added")
}
info := &peerInfo{
td: td,
currentBlock: currentBlock,
id: peerId, //peer.Identity().Pubkey()
requestBlockHashes: requestBlockHashes,
requestBlocks: requestBlocks,
invalidBlock: invalidBlock,
}
self.peers[peerId] = info
poolLogger.Debugf("add new peer %v with td %v", peerId, td)
currentTD := ethutil.Big0
if self.peer != nil { if self.peer != nil {
currentTD = self.peer.td self.peer.requestBlockHashes(hash)
}
if td.Cmp(currentTD) > 0 {
self.peer = info
self.eventer.Post(peerChangeEvent{info})
poolLogger.Debugf("peer %v promoted to best peer", peerId)
requestBlockHashes(currentBlock)
return true
} }
return false
} }
// RemovePeer is called by the eth protocol when the peer disconnects func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) {
func (self *BlockPool) RemovePeer(peerId string) { // distribute block request among known peers
self.peersLock.Lock() self.peersLock.Lock()
defer self.peersLock.Unlock() defer self.peersLock.Unlock()
if self.peers[peerId] != nil { peerCount := len(self.peers)
panic("peer already removed") // on first attempt use the best peer
} if attempts == 0 {
self.peers[peerId] = nil self.peer.requestBlocks(hashes)
poolLogger.Debugf("remove peer %v", peerId[0:4]) return
// if current best peer is removed, need find a better one
if peerId == self.peer.id {
var newPeer *peerInfo
max := ethutil.Big0
// peer with the highest self-acclaimed TD is chosen
for _, info := range self.peers {
if info.td.Cmp(max) > 0 {
max = info.td
newPeer = info
} }
repetitions := int(math.Min(float64(peerCount), float64(blocksRequestRepetition)))
poolLogger.Debugf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
i := 0
indexes := rand.Perm(peerCount)[0:(repetitions - 1)]
sort.Ints(indexes)
for _, peer := range self.peers {
if i == indexes[0] {
peer.requestBlocks(hashes)
indexes = indexes[1:]
if len(indexes) == 0 {
break
} }
self.peer = newPeer
self.eventer.Post(peerChangeEvent{newPeer})
if newPeer != nil {
poolLogger.Debugf("peer %v with td %v spromoted to best peer", newPeer.id[0:4], newPeer.td)
newPeer.requestBlockHashes(newPeer.currentBlock)
} else {
poolLogger.Warnln("no peers left")
} }
i++
} }
} }
func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) { func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) {
self.peersLock.Lock() self.peersLock.RLock()
defer self.peersLock.Unlock() defer self.peersLock.RUnlock()
if self.peer.id == peerId { if self.peer != nil && self.peer.id == peerId {
return self.peer, true return self.peer, true
} }
info, ok := self.peers[peerId] info, ok := self.peers[peerId]
...@@ -414,101 +683,332 @@ func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) { ...@@ -414,101 +683,332 @@ func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) {
return info, false return info, false
} }
// if same peer gave different chain before, this will overwrite it func (self *peerInfo) addSection(hash []byte, section *section) {
// if currentPoolNode existed as a non-leaf node the earlier fork is delinked self.lock.Lock()
// if same parent hash is found, we can abort, we do not allow the same peer to change minds about parent of same hash, if errored first time round, will get penalized. defer self.lock.Unlock()
// if lastPoolNode had a different parent the earlier parent (with entire subtree) is delinked, this situation cannot normally arise though self.sections[string(hash)] = section
// just in case reset lastPoolNode as non-root (unlikely) }
func (self *peerInfo) addRoot(node *poolNode) {
self.lock.Lock()
defer self.lock.Unlock()
self.roots = append(self.roots, node)
}
// (re)starts processes registered for this peer (self)
func (self *peerInfo) start(peer *peerInfo) {
self.lock.Lock()
defer self.lock.Unlock()
self.quitC = make(chan bool)
for _, root := range self.roots {
root.sectionRLock()
if root.section.bottom != nil {
if root.parent == nil {
self.requestBlockHashes(root.hash)
}
}
root.sectionRUnlock()
}
self.roots = nil
self.controlSections(peer, true)
}
// (re)starts process without requests, only suicide timer
func (self *peerInfo) stop(peer *peerInfo) {
self.lock.RLock()
defer self.lock.RUnlock()
close(self.quitC)
self.controlSections(peer, false)
}
func (self *peerInfo) controlSections(peer *peerInfo, on bool) {
if peer != nil {
peer.lock.RLock()
defer peer.lock.RUnlock()
}
for hash, section := range peer.sections {
if section.done() {
delete(self.sections, hash)
}
_, exists := peer.sections[hash]
if on || peer == nil || exists {
if on {
// self is best peer
section.start()
} else {
// (re)starts process without requests, only suicide timer
section.stop()
}
}
}
}
// called when parent is found in pool
// parent and child are guaranteed to be on different sections
func (self *BlockPool) link(parent, child *poolNode) { func (self *BlockPool) link(parent, child *poolNode) {
// reactivate node scheduled for suicide var top bool
if parent.suicide != nil { parent.sectionLock()
close(parent.suicide) if child != nil {
parent.suicide = nil child.sectionLock()
}
if parent == parent.section.top && parent.section.top != nil {
top = true
}
var bottom bool
if child == child.section.bottom {
bottom = true
} }
if parent.child != child { if parent.child != child {
orphan := parent.child orphan := parent.child
if orphan != nil {
// got a fork in the chain
if top {
orphan.lock.Lock()
// make old child orphan
orphan.parent = nil orphan.parent = nil
go self.killChain(orphan) orphan.lock.Unlock()
} else { // we are under section lock
// make old child orphan
orphan.parent = nil
// reset section objects above the fork
nchild := orphan.child
node := orphan
section := &section{bottom: orphan}
for node.section == nchild.section {
node = nchild
node.section = section
nchild = node.child
}
section.top = node
// set up a suicide
self.processSection(orphan).stop()
}
} else {
// child is on top of a chain need to close section
child.section.bottom = child
}
// adopt new child
parent.child = child parent.child = child
if !top {
parent.section.top = parent
// restart section process so that shorter section is scanned for blocks
parent.section.reset()
}
} }
if child != nil { if child != nil {
if child.parent != parent { if child.parent != parent {
orphan := child.parent stepParent := child.parent
orphan.child = nil if stepParent != nil {
go func() { if bottom {
// if it is a aberrant reverse fork, zip down to bottom stepParent.Lock()
for orphan.parent != nil { stepParent.child = nil
orphan = orphan.parent stepParent.Unlock()
} else {
// we are on the same section
// if it is a aberrant reverse fork,
stepParent.child = nil
node := stepParent
nparent := stepParent.child
section := &section{top: stepParent}
for node.section == nparent.section {
node = nparent
node.section = section
node = node.parent
}
}
} else {
// linking to a root node, ie. parent is under the root of a chain
parent.section.top = parent
}
} }
self.killChain(orphan)
}()
child.parent = parent child.parent = parent
child.section.bottom = child
} }
// this needed if someone lied about the parent before
child.knownParent = false child.knownParent = false
parent.sectionUnlock()
if child != nil {
child.sectionUnlock()
} }
} }
func (self *BlockPool) killChain(node *poolNode) { // this immediately kills the chain from node to end (inclusive) section by section
if node == nil { func (self *BlockPool) killChain(node *poolNode, end *poolNode) {
return poolLogger.Debugf("kill chain section with root node %v", node)
}
poolLogger.Debugf("suicide scheduled on node %v", node) node.sectionLock()
suicide := make(chan bool) node.section.abort()
self.lock.Lock() self.set(node.hash, nil)
node.suicide = suicide child := node.child
self.lock.Unlock() top := node.section.top
timer := time.After(cacheTimeout * time.Minute) i := 1
self.wg.Add(1) self.wg.Add(1)
go func() {
var quit bool
for node != top && node != end && child != nil {
node = child
select { select {
case <-self.quit: case <-self.quit:
case <-suicide: quit = true
// cancel suicide = close node.suicide to reactivate node break
case <-timer: default:
poolLogger.Debugf("suicide on node %v", node) }
self.lock.Lock() self.set(node.hash, nil)
defer self.lock.Unlock() child = node.child
// proceed up via child links until another suicide root found or chain ends }
// abort request blocks loops that start above poolLogger.Debugf("killed chain section of %v blocks with root node %v", i, node)
// and delete nodes from pool then quit the suicide process if !quit {
okToAbort := node.blockRequestRoot if node == top {
for node != nil && (node.suicide == suicide || node.suicide == nil) { if node != end && child != nil && end != nil {
self.pool[string(node.hash)] = nil //
if okToAbort && node.blockRequestQuit != nil { self.killChain(child, end)
quit := *(node.blockRequestQuit)
if quit != nil { // not yet closed
*(node.blockRequestQuit) = nil
close(quit)
} }
} else { } else {
okToAbort = true if child != nil {
// delink rest of this section if ended midsection
child.section.bottom = child
child.parent = nil
} }
node = node.child
} }
} }
node.section.bottom = nil
node.sectionUnlock()
self.wg.Done() self.wg.Done()
}()
} }
// AddBlock is the entry point for the eth protocol when blockmsg is received upon requests // structure to store long range links on chain to skip along
// It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error (which can be ignored) type section struct {
// block is checked for PoW lock sync.RWMutex
// only the first PoW-valid block for a hash is considered legit bottom *poolNode
func (self *BlockPool) AddBlock(block *types.Block, peerId string) (err error) { top *poolNode
hash := block.Hash() controlC chan bool
resetC chan bool
}
func (self *section) start() {
self.lock.RLock()
defer self.lock.RUnlock()
if self.controlC != nil {
self.controlC <- true
}
}
func (self *section) stop() {
self.lock.RLock()
defer self.lock.RUnlock()
if self.controlC != nil {
self.controlC <- false
}
}
func (self *section) reset() {
self.lock.RLock()
defer self.lock.RUnlock()
if self.controlC != nil {
self.resetC <- true
self.controlC <- false
}
}
func (self *section) abort() {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
node, ok := self.pool[string(hash)] if self.controlC != nil {
if !ok && !self.chainManager.KnownBlock(hash) { close(self.controlC)
return fmt.Errorf("unrequested block %x", hash) self.controlC = nil
} }
if node.block != nil { }
return
func (self *section) done() bool {
self.lock.Lock()
defer self.lock.Unlock()
if self.controlC != nil {
return true
} }
// validate block for PoW return false
if !self.chainManager.CheckPoW(block) { }
return fmt.Errorf("invalid pow on block %x", hash)
func (self *BlockPool) get(hash []byte) (node *poolNode) {
self.lock.Lock()
defer self.lock.Unlock()
return self.pool[string(hash)]
}
func (self *BlockPool) set(hash []byte, node *poolNode) {
self.lock.Lock()
defer self.lock.Unlock()
self.pool[string(hash)] = node
}
// first time for block request, this iteration retrieves nodes of the chain
// from node up to top (all the way if nil) via child links
// copies the controller
// and feeds nodeC channel
// this is performed under section readlock to prevent top from going away
// when
func (self *BlockPool) foldUp(node *poolNode, nodeC chan *poolNode) {
self.wg.Add(1)
go func() {
node.sectionRLock()
defer node.sectionRUnlock()
for node != nil {
select {
case <-self.quit:
break
case nodeC <- node:
if node == node.section.top {
break
} }
node.block = block node = node.child
node.source = peerId }
return nil }
close(nodeC)
self.wg.Done()
}()
}
func (self *poolNode) Lock() {
self.sectionLock()
self.lock.Lock()
}
func (self *poolNode) Unlock() {
self.lock.Unlock()
self.sectionUnlock()
}
func (self *poolNode) RLock() {
self.lock.RLock()
}
func (self *poolNode) RUnlock() {
self.lock.RUnlock()
}
func (self *poolNode) sectionLock() {
self.lock.RLock()
defer self.lock.RUnlock()
self.section.lock.Lock()
}
func (self *poolNode) sectionUnlock() {
self.lock.RLock()
defer self.lock.RUnlock()
self.section.lock.Unlock()
}
func (self *poolNode) sectionRLock() {
self.lock.RLock()
defer self.lock.RUnlock()
self.section.lock.RLock()
}
func (self *poolNode) sectionRUnlock() {
self.lock.RLock()
defer self.lock.RUnlock()
self.section.lock.RUnlock()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment