Unverified Commit 6ec15610 authored by Martin Holst Swende's avatar Martin Holst Swende Committed by GitHub

eth: implement eth66 (#22241)

* eth/protocols/eth: split up the eth protocol handlers

* eth/protocols/eth: define eth-66 protocol messages

* eth/protocols/eth: poc implement getblockheaders on eth/66

* eth/protocols/eth: implement remaining eth-66 handlers

* eth/protocols: define handler map for eth 66

* eth/downloader: use protocol constants from eth package

* eth/protocols/eth: add ETH66 capability

* eth/downloader: tests for eth66

* eth/downloader: fix error in tests

* eth/protocols/eth: use eth66 for outgoing requests

* eth/protocols/eth: remove unused error type

* eth/protocols/eth: define protocol length

* eth/protocols/eth: fix pooled tx over eth66

* protocols/eth/handlers: revert behavioural change which caused tests to fail

* eth/downloader: fix failing test

* eth/protocols/eth: add testcases + fix flaw with header requests

* eth/protocols: change comments

* eth/protocols/eth: review fixes + fixed flaw in RequestOneHeader

* eth/protocols: documentation

* eth/protocols/eth: review concerns about types
parent b1835b38
This diff is collapsed.
......@@ -29,6 +29,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
......@@ -457,7 +458,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(64, 65, idle, throughput)
return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
......@@ -471,7 +472,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(64, 65, idle, throughput)
return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
......@@ -485,7 +486,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(64, 65, idle, throughput)
return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
......@@ -499,7 +500,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(64, 65, idle, throughput)
return ps.idlePeers(eth.ETH64, eth.ETH66, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
......
This diff is collapsed.
This diff is collapsed.
......@@ -18,6 +18,7 @@ package eth
import (
"math/big"
"math/rand"
"sync"
mapset "github.com/deckarep/golang-set"
......@@ -267,6 +268,22 @@ func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValu
return p2p.Send(p.rw, PooledTransactionsMsg, txs) // Not packed into PooledTransactionsPacket to avoid RLP decoding
}
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
// Not packed into PooledTransactionsPacket to avoid RLP decoding
return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{
RequestId: id,
PooledTransactionsRLPPacket: txs,
})
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
......@@ -308,7 +325,10 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Pop()
}
p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{block, td})
return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
Block: block,
TD: td,
})
}
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
......@@ -331,9 +351,12 @@ func (p *Peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers))
}
// SendBlockBodies sends a batch of block contents to the remote peer.
func (p *Peer) SendBlockBodies(bodies []*BlockBody) error {
return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesPacket(bodies))
// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{
RequestId: id,
BlockHeadersPacket: headers,
})
}
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
......@@ -342,52 +365,98 @@ func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding
}
// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
// Not packed into BlockBodiesPacket to avoid RLP decoding
return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesRLPPacket66{
RequestId: id,
BlockBodiesRLPPacket: bodies,
})
}
// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *Peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data))
}
// ReplyNodeData is the eth/66 response to GetNodeData.
func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{
RequestId: id,
NodeDataPacket: data,
})
}
// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding
}
// ReplyReceiptsRLP is the eth/66 response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{
RequestId: id,
ReceiptsRLPPacket: receipts,
})
}
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket{
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: hash},
Amount: uint64(1),
Skip: uint64(0),
Reverse: false,
})
}
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
GetBlockHeadersPacket: &query,
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket{
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
})
}
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
GetBlockHeadersPacket: &query,
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket{
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Number: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
})
}
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
GetBlockHeadersPacket: &query,
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// ExpectRequestHeadersByNumber is a testing method to mirror the recipient side
......@@ -406,6 +475,12 @@ func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int,
// specified.
func (p *Peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
RequestId: rand.Uint64(),
GetBlockBodiesPacket: hashes,
})
}
return p2p.Send(p.rw, GetBlockBodiesMsg, GetBlockBodiesPacket(hashes))
}
......@@ -413,17 +488,35 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
// data, corresponding to the specified hashes.
func (p *Peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes))
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
RequestId: rand.Uint64(),
GetNodeDataPacket: hashes,
})
}
return p2p.Send(p.rw, GetNodeDataMsg, GetNodeDataPacket(hashes))
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *Peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
RequestId: rand.Uint64(),
GetReceiptsPacket: hashes,
})
}
return p2p.Send(p.rw, GetReceiptsMsg, GetReceiptsPacket(hashes))
}
// RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
if p.Version() >= ETH66 {
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: rand.Uint64(),
GetPooledTransactionsPacket: hashes,
})
}
return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes))
}
......@@ -32,6 +32,7 @@ import (
const (
ETH64 = 64
ETH65 = 65
ETH66 = 66
)
// ProtocolName is the official short name of the `eth` protocol used during
......@@ -40,11 +41,11 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH65, ETH64}
var ProtocolVersions = []uint{ETH66, ETH65, ETH64}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH65: 17, ETH64: 17}
var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17, ETH64: 17}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
......@@ -79,7 +80,6 @@ var (
errNetworkIDMismatch = errors.New("network ID mismatch")
errGenesisMismatch = errors.New("genesis mismatch")
errForkIDRejected = errors.New("fork ID rejected")
errExtraStatusMsg = errors.New("extra status message")
)
// Packet represents a p2p message in the `eth` protocol.
......@@ -129,6 +129,12 @@ type GetBlockHeadersPacket struct {
Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
}
// GetBlockHeadersPacket represents a block header query over eth/66
type GetBlockHeadersPacket66 struct {
RequestId uint64
*GetBlockHeadersPacket
}
// HashOrNumber is a combined field for specifying an origin block.
type HashOrNumber struct {
Hash common.Hash // Block hash from which to retrieve headers (excludes Number)
......@@ -168,6 +174,12 @@ func (hn *HashOrNumber) DecodeRLP(s *rlp.Stream) error {
// BlockHeadersPacket represents a block header response.
type BlockHeadersPacket []*types.Header
// BlockHeadersPacket represents a block header response over eth/66.
type BlockHeadersPacket66 struct {
RequestId uint64
BlockHeadersPacket
}
// NewBlockPacket is the network packet for the block propagation message.
type NewBlockPacket struct {
Block *types.Block
......@@ -190,9 +202,32 @@ func (request *NewBlockPacket) sanityCheck() error {
// GetBlockBodiesPacket represents a block body query.
type GetBlockBodiesPacket []common.Hash
// GetBlockBodiesPacket represents a block body query over eth/66.
type GetBlockBodiesPacket66 struct {
RequestId uint64
GetBlockBodiesPacket
}
// BlockBodiesPacket is the network packet for block content distribution.
type BlockBodiesPacket []*BlockBody
// BlockBodiesPacket is the network packet for block content distribution over eth/66.
type BlockBodiesPacket66 struct {
RequestId uint64
BlockBodiesPacket
}
// BlockBodiesRLPPacket is used for replying to block body requests, in cases
// where we already have them RLP-encoded, and thus can avoid the decode-encode
// roundtrip.
type BlockBodiesRLPPacket []rlp.RawValue
// BlockBodiesRLPPacket66 is the BlockBodiesRLPPacket over eth/66
type BlockBodiesRLPPacket66 struct {
RequestId uint64
BlockBodiesRLPPacket
}
// BlockBody represents the data content of a single block.
type BlockBody struct {
Transactions []*types.Transaction // Transactions contained within a block
......@@ -215,24 +250,78 @@ func (p *BlockBodiesPacket) Unpack() ([][]*types.Transaction, [][]*types.Header)
// GetNodeDataPacket represents a trie node data query.
type GetNodeDataPacket []common.Hash
// GetNodeDataPacket represents a trie node data query over eth/66.
type GetNodeDataPacket66 struct {
RequestId uint64
GetNodeDataPacket
}
// NodeDataPacket is the network packet for trie node data distribution.
type NodeDataPacket [][]byte
// NodeDataPacket is the network packet for trie node data distribution over eth/66.
type NodeDataPacket66 struct {
RequestId uint64
NodeDataPacket
}
// GetReceiptsPacket represents a block receipts query.
type GetReceiptsPacket []common.Hash
// GetReceiptsPacket represents a block receipts query over eth/66.
type GetReceiptsPacket66 struct {
RequestId uint64
GetReceiptsPacket
}
// ReceiptsPacket is the network packet for block receipts distribution.
type ReceiptsPacket [][]*types.Receipt
// ReceiptsPacket is the network packet for block receipts distribution over eth/66.
type ReceiptsPacket66 struct {
RequestId uint64
ReceiptsPacket
}
// ReceiptsRLPPacket is used for receipts, when we already have it encoded
type ReceiptsRLPPacket []rlp.RawValue
// ReceiptsPacket66 is the eth-66 version of ReceiptsRLPPacket
type ReceiptsRLPPacket66 struct {
RequestId uint64
ReceiptsRLPPacket
}
// NewPooledTransactionHashesPacket represents a transaction announcement packet.
type NewPooledTransactionHashesPacket []common.Hash
// GetPooledTransactionsPacket represents a transaction query.
type GetPooledTransactionsPacket []common.Hash
type GetPooledTransactionsPacket66 struct {
RequestId uint64
GetPooledTransactionsPacket
}
// PooledTransactionsPacket is the network packet for transaction distribution.
type PooledTransactionsPacket []*types.Transaction
// PooledTransactionsPacket is the network packet for transaction distribution over eth/66.
type PooledTransactionsPacket66 struct {
RequestId uint64
PooledTransactionsPacket
}
// PooledTransactionsPacket is the network packet for transaction distribution, used
// in the cases we already have them in rlp-encoded form
type PooledTransactionsRLPPacket []rlp.RawValue
// PooledTransactionsRLPPacket66 is the eth/66 form of PooledTransactionsRLPPacket
type PooledTransactionsRLPPacket66 struct {
RequestId uint64
PooledTransactionsRLPPacket
}
func (*StatusPacket) Name() string { return "Status" }
func (*StatusPacket) Kind() byte { return StatusMsg }
......
This diff is collapsed.
......@@ -70,6 +70,10 @@ func (msg Msg) Discard() error {
return err
}
func (msg Msg) Time() time.Time {
return msg.ReceivedAt
}
type MsgReader interface {
ReadMsg() (Msg, error)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment