Commit f43c07cb authored by Péter Szilágyi's avatar Péter Szilágyi

eth, eth/downloader: transition to eth 61

parent af51dc4d
...@@ -11,8 +11,6 @@ import ( ...@@ -11,8 +11,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/ethash" "github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -26,6 +24,7 @@ import ( ...@@ -26,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
......
This diff is collapsed.
This diff is collapsed.
...@@ -15,7 +15,8 @@ import ( ...@@ -15,7 +15,8 @@ import (
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
type hashFetcherFn func(common.Hash) error type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error type blockFetcherFn func([]common.Hash) error
var ( var (
...@@ -37,23 +38,25 @@ type peer struct { ...@@ -37,23 +38,25 @@ type peer struct {
ignored *set.Set // Set of hashes not to request (didn't have previously) ignored *set.Set // Set of hashes not to request (didn't have previously)
getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash
getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position
getBlocks blockFetcherFn // Method to retrieve a batch of blocks
version int // Eth protocol version number to switch strategies version int // Eth protocol version number to switch strategies
} }
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms. // mechanisms.
func newPeer(id string, version int, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head, head: head,
capacity: 1, capacity: 1,
getHashes: getHashes, getRelHashes: getRelHashes,
getBlocks: getBlocks, getAbsHashes: getAbsHashes,
ignored: set.New(), getBlocks: getBlocks,
version: version, ignored: set.New(),
version: version,
} }
} }
......
...@@ -40,9 +40,9 @@ type queue struct { ...@@ -40,9 +40,9 @@ type queue struct {
pendPool map[string]*fetchRequest // Currently pending block retrieval operations pendPool map[string]*fetchRequest // Currently pending block retrieval operations
blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes
blockCache []*Block // Downloaded but not yet delivered blocks blockCache []*Block // Downloaded but not yet delivered blocks
blockOffset int // Offset of the first cached block in the block-chain blockOffset uint64 // Offset of the first cached block in the block-chain
lock sync.RWMutex lock sync.RWMutex
} }
...@@ -53,7 +53,7 @@ func newQueue() *queue { ...@@ -53,7 +53,7 @@ func newQueue() *queue {
hashPool: make(map[common.Hash]int), hashPool: make(map[common.Hash]int),
hashQueue: prque.New(), hashQueue: prque.New(),
pendPool: make(map[string]*fetchRequest), pendPool: make(map[string]*fetchRequest),
blockPool: make(map[common.Hash]int), blockPool: make(map[common.Hash]uint64),
blockCache: make([]*Block, blockCacheLimit), blockCache: make([]*Block, blockCacheLimit),
} }
} }
...@@ -69,7 +69,7 @@ func (q *queue) Reset() { ...@@ -69,7 +69,7 @@ func (q *queue) Reset() {
q.pendPool = make(map[string]*fetchRequest) q.pendPool = make(map[string]*fetchRequest)
q.blockPool = make(map[common.Hash]int) q.blockPool = make(map[common.Hash]uint64)
q.blockOffset = 0 q.blockOffset = 0
q.blockCache = make([]*Block, blockCacheLimit) q.blockCache = make([]*Block, blockCacheLimit)
} }
...@@ -130,7 +130,7 @@ func (q *queue) Has(hash common.Hash) bool { ...@@ -130,7 +130,7 @@ func (q *queue) Has(hash common.Hash) bool {
// Insert adds a set of hashes for the download queue for scheduling, returning // Insert adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered. // the new hashes encountered.
func (q *queue) Insert(hashes []common.Hash) []common.Hash { func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -147,7 +147,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash { ...@@ -147,7 +147,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash {
inserts = append(inserts, hash) inserts = append(inserts, hash)
q.hashPool[hash] = q.hashCounter q.hashPool[hash] = q.hashCounter
q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first if fifo {
q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first
} else {
q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
}
} }
return inserts return inserts
} }
...@@ -175,7 +179,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block { ...@@ -175,7 +179,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block {
return nil return nil
} }
// Return the block if it's still available in the cache // Return the block if it's still available in the cache
if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) { if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) {
return q.blockCache[index-q.blockOffset] return q.blockCache[index-q.blockOffset]
} }
return nil return nil
...@@ -202,7 +206,7 @@ func (q *queue) TakeBlocks() []*Block { ...@@ -202,7 +206,7 @@ func (q *queue) TakeBlocks() []*Block {
for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ { for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
q.blockCache[k] = nil q.blockCache[k] = nil
} }
q.blockOffset += len(blocks) q.blockOffset += uint64(len(blocks))
return blocks return blocks
} }
...@@ -318,7 +322,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { ...@@ -318,7 +322,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
continue continue
} }
// If a requested block falls out of the range, the hash chain is invalid // If a requested block falls out of the range, the hash chain is invalid
index := int(block.NumberU64()) - q.blockOffset index := int(int64(block.NumberU64()) - int64(q.blockOffset))
if index >= len(q.blockCache) || index < 0 { if index >= len(q.blockCache) || index < 0 {
return errInvalidChain return errInvalidChain
} }
...@@ -329,7 +333,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { ...@@ -329,7 +333,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
} }
delete(request.Hashes, hash) delete(request.Hashes, hash)
delete(q.hashPool, hash) delete(q.hashPool, hash)
q.blockPool[hash] = int(block.NumberU64()) q.blockPool[hash] = block.NumberU64()
} }
// Return all failed or missing fetches to the queue // Return all failed or missing fetches to the queue
for hash, index := range request.Hashes { for hash, index := range request.Hashes {
...@@ -346,7 +350,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { ...@@ -346,7 +350,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
} }
// Prepare configures the block cache offset to allow accepting inbound blocks. // Prepare configures the block cache offset to allow accepting inbound blocks.
func (q *queue) Prepare(offset int) { func (q *queue) Prepare(offset uint64) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
......
...@@ -96,7 +96,7 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po ...@@ -96,7 +96,7 @@ func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow po
} }
} }
// Construct the different synchronisation mechanisms // Construct the different synchronisation mechanisms
manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.CurrentBlock, manager.chainman.InsertChain, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error { validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent, true) return core.ValidateHeader(pow, block.Header(), parent, true)
...@@ -181,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error { ...@@ -181,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil { if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing
...@@ -214,50 +214,50 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -214,50 +214,50 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Handle the message depending on its contents // Handle the message depending on its contents
switch msg.Code { switch msg.Code {
case StatusMsg: case StatusMsg:
// Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message") return errResp(ErrExtraStatusMsg, "uncontrolled status message")
case TxMsg:
// Transactions arrived, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
propTxnInPacketsMeter.Mark(1)
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
// Log it's arrival for later analysis
propTxnInTrafficMeter.Mark(tx.Size().Int64())
jsonlogger.LogJson(&logger.EthTxReceived{
TxHash: tx.Hash().Hex(),
RemoteId: p.ID().String(),
})
}
pm.txpool.AddTransactions(txs)
case GetBlockHashesMsg: case GetBlockHashesMsg:
// Retrieve the number of hashes to return and from which origin hash
var request getBlockHashesData var request getBlockHashesData
if err := msg.Decode(&request); err != nil { if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "->msg %v: %v", msg, err) return errResp(ErrDecode, "%v: %v", msg, err)
} }
if request.Amount > uint64(downloader.MaxHashFetch) { if request.Amount > uint64(downloader.MaxHashFetch) {
request.Amount = uint64(downloader.MaxHashFetch) request.Amount = uint64(downloader.MaxHashFetch)
} }
// Retrieve the hashes from the block chain and return them
hashes := pm.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) hashes := pm.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
if len(hashes) == 0 {
glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4])
}
return p.SendBlockHashes(hashes)
if glog.V(logger.Debug) { case GetBlockHashesFromNumberMsg:
if len(hashes) == 0 { // Retrieve and decode the number of hashes to return and from which origin number
glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4]) var request getBlockHashesFromNumberData
} if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
if request.Amount > uint64(downloader.MaxHashFetch) {
request.Amount = uint64(downloader.MaxHashFetch)
} }
// Calculate the last block that should be retrieved, and short circuit if unavailable
last := pm.chainman.GetBlockByNumber(request.Number + request.Amount - 1)
if last == nil {
last = pm.chainman.CurrentBlock()
request.Amount = last.NumberU64() - request.Number + 1
}
if last.NumberU64() < request.Number {
return p.SendBlockHashes(nil)
}
// Retrieve the hashes from the last block backwards, reverse and return
hashes := []common.Hash{last.Hash()}
hashes = append(hashes, pm.chainman.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...)
// returns either requested hashes or nothing (i.e. not found) for i := 0; i < len(hashes)/2; i++ {
hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i]
}
return p.SendBlockHashes(hashes) return p.SendBlockHashes(hashes)
case BlockHashesMsg: case BlockHashesMsg:
...@@ -399,6 +399,29 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -399,6 +399,29 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.SetTd(request.TD) p.SetTd(request.TD)
go pm.synchronise(p) go pm.synchronise(p)
case TxMsg:
// Transactions arrived, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
propTxnInPacketsMeter.Mark(1)
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
// Log it's arrival for later analysis
propTxnInTrafficMeter.Mark(tx.Size().Int64())
jsonlogger.LogJson(&logger.EthTxReceived{
TxHash: tx.Hash().Hex(),
RemoteId: p.ID().String(),
})
}
pm.txpool.AddTransactions(txs)
default: default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code) return errResp(ErrInvalidMsgCode, "%v", msg.Code)
} }
......
package eth package eth
import "github.com/rcrowley/go-metrics" import (
"github.com/ethereum/go-ethereum/metrics"
)
var ( var (
propTxnInPacketsMeter = metrics.GetOrRegisterMeter("eth/prop/txns/in/packets", metrics.DefaultRegistry) propTxnInPacketsMeter = metrics.NewMeter("eth/prop/txns/in/packets")
propTxnInTrafficMeter = metrics.GetOrRegisterMeter("eth/prop/txns/in/traffic", metrics.DefaultRegistry) propTxnInTrafficMeter = metrics.NewMeter("eth/prop/txns/in/traffic")
propTxnOutPacketsMeter = metrics.GetOrRegisterMeter("eth/prop/txns/out/packets", metrics.DefaultRegistry) propTxnOutPacketsMeter = metrics.NewMeter("eth/prop/txns/out/packets")
propTxnOutTrafficMeter = metrics.GetOrRegisterMeter("eth/prop/txns/out/traffic", metrics.DefaultRegistry) propTxnOutTrafficMeter = metrics.NewMeter("eth/prop/txns/out/traffic")
propHashInPacketsMeter = metrics.GetOrRegisterMeter("eth/prop/hashes/in/packets", metrics.DefaultRegistry) propHashInPacketsMeter = metrics.NewMeter("eth/prop/hashes/in/packets")
propHashInTrafficMeter = metrics.GetOrRegisterMeter("eth/prop/hashes/in/traffic", metrics.DefaultRegistry) propHashInTrafficMeter = metrics.NewMeter("eth/prop/hashes/in/traffic")
propHashOutPacketsMeter = metrics.GetOrRegisterMeter("eth/prop/hashes/out/packets", metrics.DefaultRegistry) propHashOutPacketsMeter = metrics.NewMeter("eth/prop/hashes/out/packets")
propHashOutTrafficMeter = metrics.GetOrRegisterMeter("eth/prop/hashes/out/traffic", metrics.DefaultRegistry) propHashOutTrafficMeter = metrics.NewMeter("eth/prop/hashes/out/traffic")
propBlockInPacketsMeter = metrics.GetOrRegisterMeter("eth/prop/blocks/in/packets", metrics.DefaultRegistry) propBlockInPacketsMeter = metrics.NewMeter("eth/prop/blocks/in/packets")
propBlockInTrafficMeter = metrics.GetOrRegisterMeter("eth/prop/blocks/in/traffic", metrics.DefaultRegistry) propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic")
propBlockOutPacketsMeter = metrics.GetOrRegisterMeter("eth/prop/blocks/out/packets", metrics.DefaultRegistry) propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets")
propBlockOutTrafficMeter = metrics.GetOrRegisterMeter("eth/prop/blocks/out/traffic", metrics.DefaultRegistry) propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic")
reqHashInPacketsMeter = metrics.GetOrRegisterMeter("eth/req/hashes/in/packets", metrics.DefaultRegistry) reqHashInPacketsMeter = metrics.NewMeter("eth/req/hashes/in/packets")
reqHashInTrafficMeter = metrics.GetOrRegisterMeter("eth/req/hashes/in/traffic", metrics.DefaultRegistry) reqHashInTrafficMeter = metrics.NewMeter("eth/req/hashes/in/traffic")
reqHashOutPacketsMeter = metrics.GetOrRegisterMeter("eth/req/hashes/out/packets", metrics.DefaultRegistry) reqHashOutPacketsMeter = metrics.NewMeter("eth/req/hashes/out/packets")
reqHashOutTrafficMeter = metrics.GetOrRegisterMeter("eth/req/hashes/out/traffic", metrics.DefaultRegistry) reqHashOutTrafficMeter = metrics.NewMeter("eth/req/hashes/out/traffic")
reqBlockInPacketsMeter = metrics.GetOrRegisterMeter("eth/req/blocks/in/packets", metrics.DefaultRegistry) reqBlockInPacketsMeter = metrics.NewMeter("eth/req/blocks/in/packets")
reqBlockInTrafficMeter = metrics.GetOrRegisterMeter("eth/req/blocks/in/traffic", metrics.DefaultRegistry) reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic")
reqBlockOutPacketsMeter = metrics.GetOrRegisterMeter("eth/req/blocks/out/packets", metrics.DefaultRegistry) reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets")
reqBlockOutTrafficMeter = metrics.GetOrRegisterMeter("eth/req/blocks/out/traffic", metrics.DefaultRegistry) reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic")
) )
...@@ -174,9 +174,9 @@ func (p *peer) RequestHashes(from common.Hash) error { ...@@ -174,9 +174,9 @@ func (p *peer) RequestHashes(from common.Hash) error {
// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the // RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the
// requested block number, going upwards towards the genesis block. // requested block number, going upwards towards the genesis block.
func (p *peer) RequestHashesFromNumber(from uint64) error { func (p *peer) RequestHashesFromNumber(from uint64, count int) error {
glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from #%d...\n", p.id, downloader.MaxHashFetch, from) glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from #%d...\n", p.id, count, from)
return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(downloader.MaxHashFetch)}) return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)})
} }
// RequestBlocks fetches a batch of blocks corresponding to the specified hashes. // RequestBlocks fetches a batch of blocks corresponding to the specified hashes.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment