eth: drop eth/65, the last non-reqid protocol version

parent 1b5582ac
...@@ -45,7 +45,7 @@ func TestEthSuite(t *testing.T) { ...@@ -45,7 +45,7 @@ func TestEthSuite(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("could not create new test suite: %v", err) t.Fatalf("could not create new test suite: %v", err)
} }
for _, test := range suite.AllEthTests() { for _, test := range suite.Eth66Tests() {
t.Run(test.Name, func(t *testing.T) { t.Run(test.Name, func(t *testing.T) {
result := utesting.RunTAP([]utesting.Test{{Name: test.Name, Fn: test.Fn}}, os.Stdout) result := utesting.RunTAP([]utesting.Test{{Name: test.Name, Fn: test.Fn}}, os.Stdout)
if result[0].Failed { if result[0].Failed {
......
...@@ -448,8 +448,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I ...@@ -448,8 +448,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.mux.Post(DoneEvent{latest}) d.mux.Post(DoneEvent{latest})
} }
}() }()
if p.version < eth.ETH65 { if p.version < eth.ETH66 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH65) return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66)
} }
mode := d.getMode() mode := d.getMode()
......
This diff is collapsed.
...@@ -413,7 +413,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { ...@@ -413,7 +413,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int { throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockHeadersMsg, time.Second) return p.rates.Capacity(eth.BlockHeadersMsg, time.Second)
} }
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
} }
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
...@@ -425,7 +425,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { ...@@ -425,7 +425,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int { throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockBodiesMsg, time.Second) return p.rates.Capacity(eth.BlockBodiesMsg, time.Second)
} }
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
} }
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
...@@ -437,7 +437,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { ...@@ -437,7 +437,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int { throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.ReceiptsMsg, time.Second) return p.rates.Capacity(eth.ReceiptsMsg, time.Second)
} }
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
} }
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
...@@ -449,7 +449,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { ...@@ -449,7 +449,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
throughput := func(p *peerConnection) int { throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.NodeDataMsg, time.Second) return p.rates.Capacity(eth.NodeDataMsg, time.Second)
} }
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput)
} }
// idlePeers retrieves a flat list of all currently idle peers satisfying the // idlePeers retrieves a flat list of all currently idle peers satisfying the
......
...@@ -117,7 +117,6 @@ type handler struct { ...@@ -117,7 +117,6 @@ type handler struct {
whitelist map[uint64]common.Hash whitelist map[uint64]common.Hash
// channels for fetcher, syncer, txsyncLoop // channels for fetcher, syncer, txsyncLoop
txsyncCh chan *txsync
quitSync chan struct{} quitSync chan struct{}
chainSync *chainSyncer chainSync *chainSyncer
...@@ -140,7 +139,6 @@ func newHandler(config *handlerConfig) (*handler, error) { ...@@ -140,7 +139,6 @@ func newHandler(config *handlerConfig) (*handler, error) {
chain: config.Chain, chain: config.Chain,
peers: newPeerSet(), peers: newPeerSet(),
whitelist: config.Whitelist, whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}), quitSync: make(chan struct{}),
} }
if config.Sync == downloader.FullSync { if config.Sync == downloader.FullSync {
...@@ -408,9 +406,8 @@ func (h *handler) Start(maxPeers int) { ...@@ -408,9 +406,8 @@ func (h *handler) Start(maxPeers int) {
go h.minedBroadcastLoop() go h.minedBroadcastLoop()
// start sync handlers // start sync handlers
h.wg.Add(2) h.wg.Add(1)
go h.chainSync.loop() go h.chainSync.loop()
go h.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
} }
func (h *handler) Stop() { func (h *handler) Stop() {
......
...@@ -80,7 +80,6 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { ...@@ -80,7 +80,6 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// Tests that peers are correctly accepted (or rejected) based on the advertised // Tests that peers are correctly accepted (or rejected) based on the advertised
// fork IDs in the protocol handshake. // fork IDs in the protocol handshake.
func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) }
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) } func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
func testForkIDSplit(t *testing.T, protocol uint) { func testForkIDSplit(t *testing.T, protocol uint) {
...@@ -236,7 +235,6 @@ func testForkIDSplit(t *testing.T, protocol uint) { ...@@ -236,7 +235,6 @@ func testForkIDSplit(t *testing.T, protocol uint) {
} }
// Tests that received transactions are added to the local pool. // Tests that received transactions are added to the local pool.
func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) }
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) } func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
func testRecvTransactions(t *testing.T, protocol uint) { func testRecvTransactions(t *testing.T, protocol uint) {
...@@ -294,7 +292,6 @@ func testRecvTransactions(t *testing.T, protocol uint) { ...@@ -294,7 +292,6 @@ func testRecvTransactions(t *testing.T, protocol uint) {
} }
// This test checks that pending transactions are sent. // This test checks that pending transactions are sent.
func TestSendTransactions65(t *testing.T) { testSendTransactions(t, eth.ETH65) }
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) } func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
func testSendTransactions(t *testing.T, protocol uint) { func testSendTransactions(t *testing.T, protocol uint) {
...@@ -306,7 +303,7 @@ func testSendTransactions(t *testing.T, protocol uint) { ...@@ -306,7 +303,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
insert := make([]*types.Transaction, 100) insert := make([]*types.Transaction, 100)
for nonce := range insert { for nonce := range insert {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, txsyncPackSize/10)) tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, 10240))
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
insert[nonce] = tx insert[nonce] = tx
...@@ -380,7 +377,6 @@ func testSendTransactions(t *testing.T, protocol uint) { ...@@ -380,7 +377,6 @@ func testSendTransactions(t *testing.T, protocol uint) {
// Tests that transactions get propagated to all attached peers, either via direct // Tests that transactions get propagated to all attached peers, either via direct
// broadcasts or via announcements/retrievals. // broadcasts or via announcements/retrievals.
func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) }
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) } func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
func testTransactionPropagation(t *testing.T, protocol uint) { func testTransactionPropagation(t *testing.T, protocol uint) {
...@@ -521,8 +517,8 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo ...@@ -521,8 +517,8 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
defer p2pLocal.Close() defer p2pLocal.Close()
defer p2pRemote.Close() defer p2pRemote.Close()
local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool) local := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool) remote := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool)
defer local.Close() defer local.Close()
defer remote.Close() defer remote.Close()
...@@ -543,30 +539,39 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo ...@@ -543,30 +539,39 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil {
t.Fatalf("failed to run protocol handshake") t.Fatalf("failed to run protocol handshake")
} }
// Connect a new peer and check that we receive the checkpoint challenge. // Connect a new peer and check that we receive the checkpoint challenge.
if checkpoint { if checkpoint {
if err := remote.ExpectRequestHeadersByNumber(response.Number.Uint64(), 1, 0, false); err != nil { msg, err := p2pRemote.ReadMsg()
t.Fatalf("challenge mismatch: %v", err) if err != nil {
t.Fatalf("failed to read checkpoint challenge: %v", err)
}
request := new(eth.GetBlockHeadersPacket66)
if err := msg.Decode(request); err != nil {
t.Fatalf("failed to decode checkpoint challenge: %v", err)
}
query := request.GetBlockHeadersPacket
if query.Origin.Number != response.Number.Uint64() || query.Amount != 1 || query.Skip != 0 || query.Reverse {
t.Fatalf("challenge mismatch: have [%d, %d, %d, %v] want [%d, %d, %d, %v]",
query.Origin.Number, query.Amount, query.Skip, query.Reverse,
response.Number.Uint64(), 1, 0, false)
} }
// Create a block to reply to the challenge if no timeout is simulated. // Create a block to reply to the challenge if no timeout is simulated.
if !timeout { if !timeout {
if empty { if empty {
if err := remote.SendBlockHeaders([]*types.Header{}); err != nil { if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{}); err != nil {
t.Fatalf("failed to answer challenge: %v", err) t.Fatalf("failed to answer challenge: %v", err)
} }
} else if match { } else if match {
if err := remote.SendBlockHeaders([]*types.Header{response}); err != nil { if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{response}); err != nil {
t.Fatalf("failed to answer challenge: %v", err) t.Fatalf("failed to answer challenge: %v", err)
} }
} else { } else {
if err := remote.SendBlockHeaders([]*types.Header{{Number: response.Number}}); err != nil { if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{{Number: response.Number}}); err != nil {
t.Fatalf("failed to answer challenge: %v", err) t.Fatalf("failed to answer challenge: %v", err)
} }
} }
} }
} }
// Wait until the test timeout passes to ensure proper cleanup // Wait until the test timeout passes to ensure proper cleanup
time.Sleep(syncChallengeTimeout + 300*time.Millisecond) time.Sleep(syncChallengeTimeout + 300*time.Millisecond)
...@@ -619,8 +624,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { ...@@ -619,8 +624,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
defer sourcePipe.Close() defer sourcePipe.Close()
defer sinkPipe.Close() defer sinkPipe.Close()
sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil)
sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil)
defer sourcePeer.Close() defer sourcePeer.Close()
defer sinkPeer.Close() defer sinkPeer.Close()
...@@ -671,7 +676,6 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { ...@@ -671,7 +676,6 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Tests that a propagated malformed block (uncles or transactions don't match // Tests that a propagated malformed block (uncles or transactions don't match
// with the hashes in the header) gets discarded and not broadcast forward. // with the hashes in the header) gets discarded and not broadcast forward.
func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) }
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) } func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
func testBroadcastMalformedBlock(t *testing.T, protocol uint) { func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
......
...@@ -171,29 +171,11 @@ type Decoder interface { ...@@ -171,29 +171,11 @@ type Decoder interface {
Time() time.Time Time() time.Time
} }
var eth65 = map[uint64]msgHandler{
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetNodeDataMsg: handleGetNodeData,
NodeDataMsg: handleNodeData,
GetReceiptsMsg: handleGetReceipts,
ReceiptsMsg: handleReceipts,
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
}
var eth66 = map[uint64]msgHandler{ var eth66 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes, NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock, NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions, TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
// eth66 messages with request-id
GetBlockHeadersMsg: handleGetBlockHeaders66, GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66, BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66, GetBlockBodiesMsg: handleGetBlockBodies66,
...@@ -219,10 +201,11 @@ func handleMessage(backend Backend, peer *Peer) error { ...@@ -219,10 +201,11 @@ func handleMessage(backend Backend, peer *Peer) error {
} }
defer msg.Discard() defer msg.Discard()
var handlers = eth65 var handlers = eth66
if peer.Version() >= ETH66 { //if peer.Version() >= ETH67 { // Left in as a sample when new protocol is added
handlers = eth66 // handlers = eth67
} //}
// Track the amount of time it takes to serve the request and run the handler // Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled { if metrics.Enabled {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
......
...@@ -110,7 +110,6 @@ func (b *testBackend) Handle(*Peer, Packet) error { ...@@ -110,7 +110,6 @@ func (b *testBackend) Handle(*Peer, Packet) error {
} }
// Tests that block headers can be retrieved from a remote chain based on user queries. // Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) }
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) } func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
func testGetBlockHeaders(t *testing.T, protocol uint) { func testGetBlockHeaders(t *testing.T, protocol uint) {
...@@ -254,12 +253,6 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { ...@@ -254,12 +253,6 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
headers = append(headers, backend.chain.GetBlockByHash(hash).Header()) headers = append(headers, backend.chain.GetBlockByHash(hash).Header())
} }
// Send the hash request and verify the response // Send the hash request and verify the response
if protocol <= ETH65 {
p2p.Send(peer.app, GetBlockHeadersMsg, tt.query)
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
} else {
p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{ p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{
RequestId: 123, RequestId: 123,
GetBlockHeadersPacket: tt.query, GetBlockHeadersPacket: tt.query,
...@@ -270,18 +263,11 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { ...@@ -270,18 +263,11 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
}); err != nil { }); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err) t.Errorf("test %d: headers mismatch: %v", i, err)
} }
}
// If the test used number origins, repeat with hashes as the too // If the test used number origins, repeat with hashes as the too
if tt.query.Origin.Hash == (common.Hash{}) { if tt.query.Origin.Hash == (common.Hash{}) {
if origin := backend.chain.GetBlockByNumber(tt.query.Origin.Number); origin != nil { if origin := backend.chain.GetBlockByNumber(tt.query.Origin.Number); origin != nil {
tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0 tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0
if protocol <= ETH65 {
p2p.Send(peer.app, GetBlockHeadersMsg, tt.query)
if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
} else {
p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{ p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{
RequestId: 456, RequestId: 456,
GetBlockHeadersPacket: tt.query, GetBlockHeadersPacket: tt.query,
...@@ -295,11 +281,9 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { ...@@ -295,11 +281,9 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
} }
} }
} }
}
} }
// Tests that block contents can be retrieved from a remote chain based on their hashes. // Tests that block contents can be retrieved from a remote chain based on their hashes.
func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) }
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) } func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
func testGetBlockBodies(t *testing.T, protocol uint) { func testGetBlockBodies(t *testing.T, protocol uint) {
...@@ -369,12 +353,6 @@ func testGetBlockBodies(t *testing.T, protocol uint) { ...@@ -369,12 +353,6 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
} }
} }
// Send the hash request and verify the response // Send the hash request and verify the response
if protocol <= ETH65 {
p2p.Send(peer.app, GetBlockBodiesMsg, hashes)
if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, bodies); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
}
} else {
p2p.Send(peer.app, GetBlockBodiesMsg, GetBlockBodiesPacket66{ p2p.Send(peer.app, GetBlockBodiesMsg, GetBlockBodiesPacket66{
RequestId: 123, RequestId: 123,
GetBlockBodiesPacket: hashes, GetBlockBodiesPacket: hashes,
...@@ -386,11 +364,9 @@ func testGetBlockBodies(t *testing.T, protocol uint) { ...@@ -386,11 +364,9 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
t.Errorf("test %d: bodies mismatch: %v", i, err) t.Errorf("test %d: bodies mismatch: %v", i, err)
} }
} }
}
} }
// Tests that the state trie nodes can be retrieved based on hashes. // Tests that the state trie nodes can be retrieved based on hashes.
func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) }
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) } func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
func testGetNodeData(t *testing.T, protocol uint) { func testGetNodeData(t *testing.T, protocol uint) {
...@@ -449,14 +425,10 @@ func testGetNodeData(t *testing.T, protocol uint) { ...@@ -449,14 +425,10 @@ func testGetNodeData(t *testing.T, protocol uint) {
} }
it.Release() it.Release()
if protocol <= ETH65 {
p2p.Send(peer.app, GetNodeDataMsg, hashes)
} else {
p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{ p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{
RequestId: 123, RequestId: 123,
GetNodeDataPacket: hashes, GetNodeDataPacket: hashes,
}) })
}
msg, err := peer.app.ReadMsg() msg, err := peer.app.ReadMsg()
if err != nil { if err != nil {
t.Fatalf("failed to read node data response: %v", err) t.Fatalf("failed to read node data response: %v", err)
...@@ -464,18 +436,14 @@ func testGetNodeData(t *testing.T, protocol uint) { ...@@ -464,18 +436,14 @@ func testGetNodeData(t *testing.T, protocol uint) {
if msg.Code != NodeDataMsg { if msg.Code != NodeDataMsg {
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg) t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
} }
var data [][]byte var (
if protocol <= ETH65 { data [][]byte
if err := msg.Decode(&data); err != nil { res NodeDataPacket66
t.Fatalf("failed to decode response node data: %v", err) )
}
} else {
var res NodeDataPacket66
if err := msg.Decode(&res); err != nil { if err := msg.Decode(&res); err != nil {
t.Fatalf("failed to decode response node data: %v", err) t.Fatalf("failed to decode response node data: %v", err)
} }
data = res.NodeDataPacket data = res.NodeDataPacket
}
// Verify that all hashes correspond to the requested data, and reconstruct a state tree // Verify that all hashes correspond to the requested data, and reconstruct a state tree
for i, want := range hashes { for i, want := range hashes {
if hash := crypto.Keccak256Hash(data[i]); hash != want { if hash := crypto.Keccak256Hash(data[i]); hash != want {
...@@ -506,7 +474,6 @@ func testGetNodeData(t *testing.T, protocol uint) { ...@@ -506,7 +474,6 @@ func testGetNodeData(t *testing.T, protocol uint) {
} }
// Tests that the transaction receipts can be retrieved based on hashes. // Tests that the transaction receipts can be retrieved based on hashes.
func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) }
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) } func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
func testGetBlockReceipts(t *testing.T, protocol uint) { func testGetBlockReceipts(t *testing.T, protocol uint) {
...@@ -566,12 +533,6 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { ...@@ -566,12 +533,6 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
receipts = append(receipts, backend.chain.GetReceiptsByHash(block.Hash())) receipts = append(receipts, backend.chain.GetReceiptsByHash(block.Hash()))
} }
// Send the hash request and verify the response // Send the hash request and verify the response
if protocol <= ETH65 {
p2p.Send(peer.app, GetReceiptsMsg, hashes)
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, receipts); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
} else {
p2p.Send(peer.app, GetReceiptsMsg, GetReceiptsPacket66{ p2p.Send(peer.app, GetReceiptsMsg, GetReceiptsPacket66{
RequestId: 123, RequestId: 123,
GetReceiptsPacket: hashes, GetReceiptsPacket: hashes,
...@@ -582,5 +543,4 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { ...@@ -582,5 +543,4 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}); err != nil { }); err != nil {
t.Errorf("receipts mismatch: %v", err) t.Errorf("receipts mismatch: %v", err)
} }
}
} }
...@@ -27,17 +27,6 @@ import ( ...@@ -27,17 +27,6 @@ import (
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )
// handleGetBlockHeaders handles Block header query, collect the requested headers and reply
func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query
var query GetBlockHeadersPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetBlockHeadersQuery(backend, &query, peer)
return peer.SendBlockHeaders(response)
}
// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders // handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query // Decode the complex header query
...@@ -135,16 +124,6 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p ...@@ -135,16 +124,6 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p
return headers return headers
} }
func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message
var query GetBlockBodiesPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetBlockBodiesQuery(backend, query, peer)
return peer.SendBlockBodiesRLP(response)
}
func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message // Decode the block body retrieval message
var query GetBlockBodiesPacket66 var query GetBlockBodiesPacket66
...@@ -174,16 +153,6 @@ func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer ...@@ -174,16 +153,6 @@ func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer
return bodies return bodies
} }
func handleGetNodeData(backend Backend, msg Decoder, peer *Peer) error {
// Decode the trie node data retrieval message
var query GetNodeDataPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetNodeDataQuery(backend, query, peer)
return peer.SendNodeData(response)
}
func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error { func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the trie node data retrieval message // Decode the trie node data retrieval message
var query GetNodeDataPacket66 var query GetNodeDataPacket66
...@@ -223,16 +192,6 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer ...@@ -223,16 +192,6 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer
return nodes return nodes
} }
func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := answerGetReceiptsQuery(backend, query, peer)
return peer.SendReceiptsRLP(response)
}
func handleGetReceipts66(backend Backend, msg Decoder, peer *Peer) error { func handleGetReceipts66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message // Decode the block receipts retrieval message
var query GetReceiptsPacket66 var query GetReceiptsPacket66
...@@ -312,15 +271,6 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { ...@@ -312,15 +271,6 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, ann) return backend.Handle(peer, ann)
} }
func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests // A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket66) res := new(BlockHeadersPacket66)
...@@ -332,15 +282,6 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -332,15 +282,6 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.BlockHeadersPacket) return backend.Handle(peer, &res.BlockHeadersPacket)
} }
func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// A batch of block bodies arrived to one of our previous requests
res := new(BlockBodiesPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of block bodies arrived to one of our previous requests // A batch of block bodies arrived to one of our previous requests
res := new(BlockBodiesPacket66) res := new(BlockBodiesPacket66)
...@@ -352,15 +293,6 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -352,15 +293,6 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.BlockBodiesPacket) return backend.Handle(peer, &res.BlockBodiesPacket)
} }
func handleNodeData(backend Backend, msg Decoder, peer *Peer) error {
// A batch of node state data arrived to one of our previous requests
res := new(NodeDataPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of node state data arrived to one of our previous requests // A batch of node state data arrived to one of our previous requests
res := new(NodeDataPacket66) res := new(NodeDataPacket66)
...@@ -372,15 +304,6 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -372,15 +304,6 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &res.NodeDataPacket) return backend.Handle(peer, &res.NodeDataPacket)
} }
func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
return backend.Handle(peer, res)
}
func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests // A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket66) res := new(ReceiptsPacket66)
...@@ -409,16 +332,6 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) ...@@ -409,16 +332,6 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return backend.Handle(peer, ann) return backend.Handle(peer, ann)
} }
func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
hashes, txs := answerGetPooledTransactions(backend, query, peer)
return peer.SendPooledTransactionsRLP(hashes, txs)
}
func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message // Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket66 var query GetPooledTransactionsPacket66
...@@ -477,26 +390,6 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { ...@@ -477,26 +390,6 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &txs) return backend.Handle(peer, &txs)
} }
func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
// Transactions can be processed, parse all of them and deliver to the pool
var txs PooledTransactionsPacket
if err := msg.Decode(&txs); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
}
return backend.Handle(peer, &txs)
}
func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them // Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() { if !backend.AcceptTxs() {
......
...@@ -27,7 +27,6 @@ import ( ...@@ -27,7 +27,6 @@ import (
) )
// Tests that handshake failures are detected and reported correctly. // Tests that handshake failures are detected and reported correctly.
func TestHandshake65(t *testing.T) { testHandshake(t, ETH65) }
func TestHandshake66(t *testing.T) { testHandshake(t, ETH66) } func TestHandshake66(t *testing.T) { testHandshake(t, ETH66) }
func testHandshake(t *testing.T, protocol uint) { func testHandshake(t *testing.T, protocol uint) {
......
...@@ -108,9 +108,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe ...@@ -108,9 +108,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
// Start up all the broadcasters // Start up all the broadcasters
go peer.broadcastBlocks() go peer.broadcastBlocks()
go peer.broadcastTransactions() go peer.broadcastTransactions()
if version >= ETH65 {
go peer.announceTransactions() go peer.announceTransactions()
}
return peer return peer
} }
...@@ -252,22 +251,6 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) { ...@@ -252,22 +251,6 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
} }
} }
// SendPooledTransactionsRLP sends requested transactions to the peer and adds the
// hashes in its transaction hash set for future reference.
//
// Note, the method assumes the hashes are correct and correspond to the list of
// transactions being sent.
func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
for _, hash := range hashes {
p.knownTxs.Add(hash)
}
return p2p.Send(p.rw, PooledTransactionsMsg, txs) // Not packed into PooledTransactionsPacket to avoid RLP decoding
}
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP. // ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error { func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
// Mark all the transactions as known, but ensure we don't overflow our limits // Mark all the transactions as known, but ensure we don't overflow our limits
...@@ -346,11 +329,6 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { ...@@ -346,11 +329,6 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
} }
} }
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *Peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers))
}
// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders. // ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error { func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{ return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{
...@@ -359,12 +337,6 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error { ...@@ -359,12 +337,6 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
}) })
} }
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
// an already RLP encoded format.
func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding
}
// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP. // ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
// Not packed into BlockBodiesPacket to avoid RLP decoding // Not packed into BlockBodiesPacket to avoid RLP decoding
...@@ -374,12 +346,6 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { ...@@ -374,12 +346,6 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
}) })
} }
// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *Peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data))
}
// ReplyNodeData is the eth/66 response to GetNodeData. // ReplyNodeData is the eth/66 response to GetNodeData.
func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error { func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{ return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{
...@@ -388,12 +354,6 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error { ...@@ -388,12 +354,6 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
}) })
} }
// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding
}
// ReplyReceiptsRLP is the eth/66 response to GetReceipts. // ReplyReceiptsRLP is the eth/66 response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{ return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{
...@@ -406,85 +366,60 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { ...@@ -406,85 +366,60 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
// single header. It is used solely by the fetcher. // single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash) error { func (p *Peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash) p.Log().Debug("Fetching single header", "hash", hash)
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: hash},
Amount: uint64(1),
Skip: uint64(0),
Reverse: false,
}
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id, RequestId: id,
GetBlockHeadersPacket: &query, GetBlockHeadersPacket: &GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: hash},
Amount: uint64(1),
Skip: uint64(0),
Reverse: false,
},
}) })
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
} }
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block. // specified header query, based on the hash of an origin block.
func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id, RequestId: id,
GetBlockHeadersPacket: &query, GetBlockHeadersPacket: &GetBlockHeadersPacket{
Origin: HashOrNumber{Hash: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
},
}) })
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
} }
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block. // specified header query, based on the number of an origin block.
func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
query := GetBlockHeadersPacket{
Origin: HashOrNumber{Number: origin},
Amount: uint64(amount),
Skip: uint64(skip),
Reverse: reverse,
}
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: id, RequestId: id,
GetBlockHeadersPacket: &query, GetBlockHeadersPacket: &GetBlockHeadersPacket{
})
}
return p2p.Send(p.rw, GetBlockHeadersMsg, &query)
}
// ExpectRequestHeadersByNumber is a testing method to mirror the recipient side
// of the RequestHeadersByNumber operation.
func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
req := &GetBlockHeadersPacket{
Origin: HashOrNumber{Number: origin}, Origin: HashOrNumber{Number: origin},
Amount: uint64(amount), Amount: uint64(amount),
Skip: uint64(skip), Skip: uint64(skip),
Reverse: reverse, Reverse: reverse,
} },
return p2p.ExpectMsg(p.rw, GetBlockHeadersMsg, req) })
} }
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified. // specified.
func (p *Peer) RequestBodies(hashes []common.Hash) error { func (p *Peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id) requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id)
...@@ -492,15 +427,12 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error { ...@@ -492,15 +427,12 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
RequestId: id, RequestId: id,
GetBlockBodiesPacket: hashes, GetBlockBodiesPacket: hashes,
}) })
}
return p2p.Send(p.rw, GetBlockBodiesMsg, GetBlockBodiesPacket(hashes))
} }
// RequestNodeData fetches a batch of arbitrary data from a node's known state // RequestNodeData fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes. // data, corresponding to the specified hashes.
func (p *Peer) RequestNodeData(hashes []common.Hash) error { func (p *Peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes)) p.Log().Debug("Fetching batch of state data", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id) requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id)
...@@ -508,14 +440,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error { ...@@ -508,14 +440,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error {
RequestId: id, RequestId: id,
GetNodeDataPacket: hashes, GetNodeDataPacket: hashes,
}) })
}
return p2p.Send(p.rw, GetNodeDataMsg, GetNodeDataPacket(hashes))
} }
// RequestReceipts fetches a batch of transaction receipts from a remote node. // RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *Peer) RequestReceipts(hashes []common.Hash) error { func (p *Peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id) requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id)
...@@ -523,14 +452,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error { ...@@ -523,14 +452,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
RequestId: id, RequestId: id,
GetReceiptsPacket: hashes, GetReceiptsPacket: hashes,
}) })
}
return p2p.Send(p.rw, GetReceiptsMsg, GetReceiptsPacket(hashes))
} }
// RequestTxs fetches a batch of transactions from a remote node. // RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error { func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64() id := rand.Uint64()
requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id) requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
...@@ -538,6 +464,4 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error { ...@@ -538,6 +464,4 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
RequestId: id, RequestId: id,
GetPooledTransactionsPacket: hashes, GetPooledTransactionsPacket: hashes,
}) })
}
return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes))
} }
...@@ -30,7 +30,6 @@ import ( ...@@ -30,7 +30,6 @@ import (
// Constants to match up protocol versions and messages // Constants to match up protocol versions and messages
const ( const (
ETH65 = 65
ETH66 = 66 ETH66 = 66
) )
...@@ -40,17 +39,16 @@ const ProtocolName = "eth" ...@@ -40,17 +39,16 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first // ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary). // is primary).
var ProtocolVersions = []uint{ETH66, ETH65} var ProtocolVersions = []uint{ETH66}
// protocolLengths are the number of implemented message corresponding to // protocolLengths are the number of implemented message corresponding to
// different protocol versions. // different protocol versions.
var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17} var protocolLengths = map[uint]uint64{ETH66: 17}
// maxMessageSize is the maximum cap on the size of a protocol message. // maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024 const maxMessageSize = 10 * 1024 * 1024
const ( const (
// Protocol messages in eth/64
StatusMsg = 0x00 StatusMsg = 0x00
NewBlockHashesMsg = 0x01 NewBlockHashesMsg = 0x01
TransactionsMsg = 0x02 TransactionsMsg = 0x02
...@@ -63,8 +61,6 @@ const ( ...@@ -63,8 +61,6 @@ const (
NodeDataMsg = 0x0e NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10 ReceiptsMsg = 0x10
// Protocol messages overloaded in eth/65
NewPooledTransactionHashesMsg = 0x08 NewPooledTransactionHashesMsg = 0x08
GetPooledTransactionsMsg = 0x09 GetPooledTransactionsMsg = 0x09
PooledTransactionsMsg = 0x0a PooledTransactionsMsg = 0x0a
...@@ -128,7 +124,7 @@ type GetBlockHeadersPacket struct { ...@@ -128,7 +124,7 @@ type GetBlockHeadersPacket struct {
Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
} }
// GetBlockHeadersPacket represents a block header query over eth/66 // GetBlockHeadersPacket66 represents a block header query over eth/66
type GetBlockHeadersPacket66 struct { type GetBlockHeadersPacket66 struct {
RequestId uint64 RequestId uint64
*GetBlockHeadersPacket *GetBlockHeadersPacket
......
...@@ -18,7 +18,6 @@ package eth ...@@ -18,7 +18,6 @@ package eth
import ( import (
"math/big" "math/big"
"math/rand"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -28,23 +27,13 @@ import ( ...@@ -28,23 +27,13 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
) )
const ( const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
defaultMinSyncPeers = 5 // Amount of peers desired to start syncing defaultMinSyncPeers = 5 // Amount of peers desired to start syncing
// This is the target size for the packs of transactions sent by txsyncLoop64.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
) )
type txsync struct {
p *eth.Peer
txs []*types.Transaction
}
// syncTransactions starts sending all currently pending transactions to the given peer. // syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) { func (h *handler) syncTransactions(p *eth.Peer) {
// Assemble the set of transaction to broadcast or announce to the remote // Assemble the set of transaction to broadcast or announce to the remote
...@@ -64,94 +53,11 @@ func (h *handler) syncTransactions(p *eth.Peer) { ...@@ -64,94 +53,11 @@ func (h *handler) syncTransactions(p *eth.Peer) {
// The eth/65 protocol introduces proper transaction announcements, so instead // The eth/65 protocol introduces proper transaction announcements, so instead
// of dripping transactions across multiple peers, just send the entire list as // of dripping transactions across multiple peers, just send the entire list as
// an announcement and let the remote side decide what they need (likely nothing). // an announcement and let the remote side decide what they need (likely nothing).
if p.Version() >= eth.ETH65 {
hashes := make([]common.Hash, len(txs)) hashes := make([]common.Hash, len(txs))
for i, tx := range txs { for i, tx := range txs {
hashes[i] = tx.Hash() hashes[i] = tx.Hash()
} }
p.AsyncSendPooledTransactionHashes(hashes) p.AsyncSendPooledTransactionHashes(hashes)
return
}
// Out of luck, peer is running legacy protocols, drop the txs over
select {
case h.txsyncCh <- &txsync{p: p, txs: txs}:
case <-h.quitSync:
}
}
// txsyncLoop64 takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (h *handler) txsyncLoop64() {
defer h.wg.Done()
var (
pending = make(map[enode.ID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
if s.p.Version() >= eth.ETH65 {
panic("initial transaction syncer running on eth/65+")
}
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.Peer.ID())
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
// pick chooses the next pending sync.
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}
for {
select {
case s := <-h.txsyncCh:
pending[s.p.Peer.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.Peer.ID())
}
// Schedule the next send.
if s := pick(); s != nil {
send(s)
}
case <-h.quitSync:
return
}
}
} }
// chainSyncer coordinates blockchain sync components. // chainSyncer coordinates blockchain sync components.
......
...@@ -28,7 +28,6 @@ import ( ...@@ -28,7 +28,6 @@ import (
) )
// Tests that fast sync is disabled after a successful sync cycle. // Tests that fast sync is disabled after a successful sync cycle.
func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, eth.ETH65) }
func TestFastSyncDisabling66(t *testing.T) { testFastSyncDisabling(t, eth.ETH66) } func TestFastSyncDisabling66(t *testing.T) { testFastSyncDisabling(t, eth.ETH66) }
// Tests that fast sync gets disabled as soon as a real block is successfully // Tests that fast sync gets disabled as soon as a real block is successfully
......
...@@ -472,7 +472,7 @@ func (d *downloaderPeerNotify) registerPeer(p *serverPeer) { ...@@ -472,7 +472,7 @@ func (d *downloaderPeerNotify) registerPeer(p *serverPeer) {
handler: h, handler: h,
peer: p, peer: p,
} }
h.downloader.RegisterLightPeer(p.id, eth.ETH65, pc) h.downloader.RegisterLightPeer(p.id, eth.ETH66, pc)
} }
func (d *downloaderPeerNotify) unregisterPeer(p *serverPeer) { func (d *downloaderPeerNotify) unregisterPeer(p *serverPeer) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment