// RemovePeer needs to be called when the peer disconnects
func(self*BlockPool)RemovePeer(peerIdstring){
self.peers.removePeer(peerId,true)
}
/*
AddBlockHashes
Entry point for eth protocol to add block hashes received via BlockHashesMsg
Only hashes from the best peer are handled
Initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer)
Launches all block request processes on each chain section
The first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
// if we reach the blockchain we stop reading further blockhashes
ifself.hasBlock(hash){
// check if known block connecting the downloaded chain to our blockchain
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain",peerId,hex(bestpeer.currentBlockHash),hex(hash))
iflen(nodes)==1{
glog.V(logger.Detail).Infof("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain",peerId,hex(bestpeer.currentBlockHash),hex(hash))
// create new section if needed and push it to the blockchain
sec=self.newSection(nodes)
sec.addSectionToBlockChain(bestpeer)
}else{
/*
not added hash yet but according to peer child section built
earlier chain connects with blockchain
this maybe a potential vulnarability
the root block arrives (or already there but its parenthash was not pointing to known block in the blockchain)
we start inserting -> error -> remove the entire chain
instead of punishing this peer
solution: when switching peers always make sure best peers own head block
and td together with blockBy are recorded on the node
*/
iflen(nodes)==0&&child!=nil{
glog.V(logger.Detail).Infof("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain",sectionhex(child),peerId,hex(bestpeer.currentBlockHash),hex(hash))
child.addSectionToBlockChain(bestpeer)
}
}
breakLOOP
}
// look up node in the pool
entry=self.get(hash)
ifentry!=nil{
// reached a known chain in the pool
ifentry.node==entry.section.bottom&&n==1{
/*
The first block hash received is an orphan node in the pool
This also supports clients that (despite the spec) include <from> hash in their
response to hashes request. Note that by providing <from> we can link sections
without having to wait for the root block of the child section to arrive, so it allows for superior performance.
*/
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping",peerId,hex(bestpeer.currentBlockHash),hex(hash),sectionhex(entry.section))
// record the entry's chain section as child section
child=entry.section
continueLOOP
}
// otherwise record entry's chain section as parent connecting it to the pool
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.",peerId,hex(bestpeer.currentBlockHash),hex(hash),sectionhex(entry.section))
parent=entry.section
breakLOOP
}
// finally if node for block hash does not exist, create it and append node to section nodes
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]",peerId,hex(bestpeer.currentBlockHash),sectionhex(sec))
sec.deactivate()
}
}
// If we are processing peer's head section, signal it to headSection process that it is created.
ifheadSection{
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process",peerId,hex(bestpeer.currentBlockHash))
varheadSec*section
switch{
casesec!=nil:
headSec=sec
casechild!=nil:
headSec=child
default:
headSec=parent
}
if!peerswitch{
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process",peerId,hex(bestpeer.currentBlockHash),sectionhex(headSec))
bestpeer.headSectionC<-headSec
}
}
}
/*
AddBlock is the entry point for the eth protocol to call when blockMsg is received.
It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error.
At the same time it is opportunistic in that if a requested block may be provided by any peer.
The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit.
If the block received is the head block of the current best peer, signal it to the head section process
glog.V(logger.Detail).Infof("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ",hex(hash),peerId,hex(sender.currentBlockHash),bnode.blockBy)
// if found but not FoundBlockCurrentHead, then no update
// necessary (||)
bnode.peers[sender.id]=(currentBlockHash==hash)
// for those that are false, TD will update their head
// for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD
}
sender.setChainInfoFromNode(bnode)
}else{
/*
@zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache
creation is not blocking
// validate block for PoW
if !self.verifyPoW(block) {
glog.V(logger.Warn).Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
bnode.block=block
bnode.blockBy=peerId
glog.V(logger.Detail).Infof("AddBlock: set td on node %s from peer <%s> (head: %s) to %v (was %v) ",hex(hash),peerId,hex(sender.currentBlockHash),bnode.td,tdFromCurrentHead)
bnode.td=tdFromCurrentHead
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
}
bnode.lock.Unlock()
currentBlockC:=sender.currentBlockC
switchC:=sender.switchC
sender.lock.Unlock()
// this must be called without peerlock.
// peerlock held can halt the loop and block on select forever
peer2.serveBlocks(5,6)// partially complete, section will be preserved
peer2.serveBlockHashes(6,5,4)// no go: make sure skeleton is created
peer1.AddPeer()// inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2")// peer2 disconnects
gopeer1.serveBlockHashes(4,3,2,1,0)//
gopeer1.serveBlocks(3,4)//
gopeer1.serveBlocks(4,5)// tests that section set by demoted peer is remembered and blocks are accepted from new peer if they have it even if peers original TD is lower
peer1.serveBlocks(0,1,2,3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6]=[]int{}// tests that idle sections are not inserted in blockchain
peer2.serveBlocks(5,6)// partially complete, section will be preserved
gopeer2.serveBlockHashes(6,5,4)//
peer2.serveBlocks(3,4)// !incomplete section
time.Sleep(100*time.Millisecond)// make sure block 4 added
peer1.AddPeer()// inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2")// peer2 disconnects
gopeer1.serveBlockHashes(4,3,2,1,0)// tests that hash request are directly connecting if the head block exists
gopeer1.serveBlocks(4,5)// tests that section set by demoted peer is remembered and blocks are accepted from new peer if they have it even if peers original TD is lower
peer1.serveBlocks(0,1,2,3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6]=[]int{}// tests that idle sections are not inserted in blockchain
glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)",self.id,td,self.td,hex(currentBlockHash),hex(previousBlockHash))
glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]",self.id,hex(self.currentBlockHash),sectionhex(self.headSection),sectionhex(sec))
glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)",self.id,hex(self.currentBlockHash),hex(currentBlock.ParentHash()))
}
self.currentBlock=currentBlock
self.parentHash=currentBlock.ParentHash()
glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes",self.id,hex(self.currentBlockHash),hex(self.parentHash))
self.blockHashesRequestTimer=time.After(0)
self.blocksRequestTimer=nil
}
func(self*peer)getBlockHashes()bool{
self.lock.Lock()
deferself.lock.Unlock()
//if connecting parent is found
ifself.bp.hasBlock(self.parentHash){
glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain",self.id,hex(self.parentHash))
glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v",self.id,hex(self.currentBlockHash),self.idle)
// signal from AddBlockHashes that head section for current best peer is created
// if sec == nil, it signals that chain info has updated (new block message)
casesec:=<-self.headSectionC:
self.handleSection(sec)
// periodic check for block hashes or parent block/section
case<-self.blockHashesRequestTimer:
self.getBlockHashes()
// signal from AddBlock that head block of current best peer has been received
casecurrentBlock:=<-self.currentBlockC:
self.getCurrentBlock(currentBlock)
// keep requesting until found or timed out
case<-self.blocksRequestTimer:
self.getCurrentBlock(nil)
// quitting on timeout
case<-self.headInfoTimer:
self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo,"timed out without providing block hashes or head block (td: %v, head: %s)",self.td,hex(self.currentBlockHash)))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
// there is no persistence here, so GC will just take care of cleaning up
// signal for peer switch, quit
case<-self.switchC:
varcomplete="incomplete "
ifself.idle{
complete="complete"
}
glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch",self.id,hex(self.currentBlockHash),complete)
breakLOOP
// global quit for blockpool
case<-self.bp.quit:
breakLOOP
// best
case<-self.bestIdleTimer:
self.peerError(self.bp.peers.errors.New(ErrIdleTooLong,"timed out without providing new blocks (td: %v, head: %s)...quitting",self.td,hex(self.currentBlockHash)))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting",self.id,sectionhex(self.headSection))
section is the worker on each chain section in the block pool
- remove the section if there are blocks missing after an absolute time
- remove the section if there are maxIdleRounds of idle rounds of block requests with no response
- periodically polls the chain section for missing blocks which are then requested from peers
- registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they removed (inserted in blockchain, invalid or expired)
- when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
- when turned back on it recursively calls itself on the root of the next chain section
*/
typesectionstruct{
locksync.RWMutex
parent*section// connecting section back in time towards blockchain
child*section// connecting section forward in time
top*node// the topmost node = head node = youngest node within the chain section
bottom*node// the bottom node = root node = oldest node within the chain section
nodes[]*node
peer*peer
parentHashcommon.Hash
blockHashes[]common.Hash
poolRootIndexint
bp*BlockPool
controlCchan*peer// to (de)register the current best peer
poolRootCchan*peer// indicate connectedness to blockchain (well, known blocks)
offCchanbool// closed if process terminated
suicideCchanbool// initiate suicide on the section
quitInitCchanbool// to signal end of initialisation
forkCchanchanbool// freeze section process while splitting
glog.V(logger.Detail).Infof("[%s] got parent head block hash %s...checking",sectionhex(self),hex(self.parentHash))
self.blockHashesRequest()
}
}
}
ifself.initialised&&self.step==self.lastMissing{
glog.V(logger.Detail).Infof("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v",sectionhex(self),self.blocksRequests,self.missing,self.lastMissing,self.depth)
self.checkRound()
checking=false
}
}// select
}// for
close(self.offC)
ifself.peer!=nil{
self.active=false
self.bp.wg.Done()
}
glog.V(logger.Detail).Infof("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).",sectionhex(self),self.depth,self.blocksRequests,self.blockHashesRequests)
}
func(self*section)switchOn(newpeer*peer){
oldpeer:=self.peer
// reset switchC/switchC to current best peer
self.idleC=newpeer.idleC
self.switchC=newpeer.switchC
self.peer=newpeer
ifoldpeer!=newpeer{
oldp:="no peer"
newp:="no peer"
ifoldpeer!=nil{
oldp=oldpeer.id
}
ifnewpeer!=nil{
newp=newpeer.id
}
glog.V(logger.Detail).Infof("[%s] active mode <%s> -> <%s>",sectionhex(self),oldp,newp)
// checks number of missing blocks after each round of request and acts accordingly
func(self*section)checkRound(){
ifself.missing==0{
// no missing blocks
glog.V(logger.Detail).Infof("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v",sectionhex(self),self.blocksRequests,self.missing,self.lastMissing,self.depth)