Unverified Commit 049e1711 authored by rjl493456442's avatar rjl493456442 Committed by Péter Szilágyi

core, eth: implement eth/65 transaction fetcher

parent dcffb777
......@@ -864,6 +864,12 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash)
}
// Has returns an indicator whether txpool has a transaction cached with the
// given hash.
func (pool *TxPool) Has(hash common.Hash) bool {
return pool.all.Get(hash) != nil
}
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
......
......@@ -470,7 +470,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(62, 64, idle, throughput)
return ps.idlePeers(62, 65, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
......@@ -484,7 +484,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(62, 64, idle, throughput)
return ps.idlePeers(62, 65, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
......@@ -498,7 +498,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
return ps.idlePeers(63, 64, idle, throughput)
return ps.idlePeers(63, 65, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
......@@ -512,7 +512,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
return ps.idlePeers(63, 64, idle, throughput)
return ps.idlePeers(63, 65, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
......
......@@ -76,7 +76,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common
// fetcherTester is a test simulator for mocking out local block chain.
type fetcherTester struct {
fetcher *Fetcher
fetcher *BlockFetcher
hashes []common.Hash // Hash chain belonging to the tester
blocks map[common.Hash]*types.Block // Blocks belonging to the tester
......@@ -92,7 +92,7 @@ func newTester() *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
tester.fetcher.Start()
return tester
......
......@@ -23,15 +23,15 @@ import (
)
var (
propAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/in", nil)
propAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/announces/out", nil)
propAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/drop", nil)
propAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/dos", nil)
blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/in", nil)
blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/block/announces/out", nil)
blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/drop", nil)
blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/dos", nil)
propBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/in", nil)
propBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/broadcasts/out", nil)
propBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/drop", nil)
propBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/dos", nil)
blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/in", nil)
blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/block/broadcasts/out", nil)
blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/drop", nil)
blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/dos", nil)
headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/headers", nil)
bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/bodies", nil)
......@@ -40,4 +40,15 @@ var (
headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/headers/out", nil)
bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/bodies/in", nil)
bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/bodies/out", nil)
txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/in", nil)
txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/dos", nil)
txAnnounceSkipMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/skip", nil)
txAnnounceUnderpriceMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/underprice", nil)
txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/broadcasts/in", nil)
txFetchOutMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/out", nil)
txFetchSuccessMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/success", nil)
txFetchTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/timeout", nil)
txFetchInvalidMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/invalid", nil)
txFetchDurationTimer = metrics.NewRegisteredTimer("eth/fetcher/fetch/transaction/duration", nil)
)
This diff is collapsed.
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package fetcher
import (
"crypto/ecdsa"
"math/big"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
func init() {
rand.Seed(int64(time.Now().Nanosecond()))
txAnnounceLimit = 64
MaxTransactionFetch = 16
}
func makeTransactions(key *ecdsa.PrivateKey, target int) []*types.Transaction {
var txs []*types.Transaction
for i := 0; i < target; i++ {
random := rand.Uint32()
tx := types.NewTransaction(uint64(random), common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(random)), 100, big.NewInt(int64(random)), nil)
tx, _ = types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1)), key)
txs = append(txs, tx)
}
return txs
}
func makeUnsignedTransactions(key *ecdsa.PrivateKey, target int) []*types.Transaction {
var txs []*types.Transaction
for i := 0; i < target; i++ {
random := rand.Uint32()
tx := types.NewTransaction(uint64(random), common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(random)), 100, big.NewInt(int64(random)), nil)
txs = append(txs, tx)
}
return txs
}
type txfetcherTester struct {
fetcher *TxFetcher
priceLimit *big.Int
sender *ecdsa.PrivateKey
senderAddr common.Address
signer types.Signer
txs map[common.Hash]*types.Transaction
dropped map[string]struct{}
lock sync.RWMutex
}
func newTxFetcherTester() *txfetcherTester {
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
t := &txfetcherTester{
sender: key,
senderAddr: addr,
signer: types.NewEIP155Signer(big.NewInt(1)),
txs: make(map[common.Hash]*types.Transaction),
dropped: make(map[string]struct{}),
}
t.fetcher = NewTxFetcher(t.hasTx, t.addTxs, t.dropPeer)
t.fetcher.Start()
return t
}
func (t *txfetcherTester) hasTx(hash common.Hash) bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.txs[hash] != nil
}
func (t *txfetcherTester) addTxs(txs []*types.Transaction) []error {
t.lock.Lock()
defer t.lock.Unlock()
var errors []error
for _, tx := range txs {
// Make sure the transaction is signed properly
_, err := types.Sender(t.signer, tx)
if err != nil {
errors = append(errors, core.ErrInvalidSender)
continue
}
// Make sure the price is high enough to accpet
if t.priceLimit != nil && tx.GasPrice().Cmp(t.priceLimit) < 0 {
errors = append(errors, core.ErrUnderpriced)
continue
}
t.txs[tx.Hash()] = tx
errors = append(errors, nil)
}
return errors
}
func (t *txfetcherTester) dropPeer(id string) {
t.lock.Lock()
defer t.lock.Unlock()
t.dropped[id] = struct{}{}
}
// makeTxFetcher retrieves a batch of transaction associated with a simulated peer.
func (t *txfetcherTester) makeTxFetcher(peer string, txs []*types.Transaction) func(hashes []common.Hash) {
closure := make(map[common.Hash]*types.Transaction)
for _, tx := range txs {
closure[tx.Hash()] = tx
}
return func(hashes []common.Hash) {
var txs []*types.Transaction
for _, hash := range hashes {
tx := closure[hash]
if tx == nil {
continue
}
txs = append(txs, tx)
}
// Return on a new thread
go t.fetcher.EnqueueTxs(peer, txs)
}
}
func TestSequentialTxAnnouncements(t *testing.T) {
tester := newTxFetcherTester()
txs := makeTransactions(tester.sender, txAnnounceLimit)
retrieveTxs := tester.makeTxFetcher("peer", txs)
newTxsCh := make(chan struct{})
tester.fetcher.importTxsHook = func(transactions []*types.Transaction) {
newTxsCh <- struct{}{}
}
for _, tx := range txs {
tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), retrieveTxs)
select {
case <-newTxsCh:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
if len(tester.txs) != len(txs) {
t.Fatalf("Imported transaction number mismatch, want %d, got %d", len(txs), len(tester.txs))
}
}
func TestConcurrentAnnouncements(t *testing.T) {
tester := newTxFetcherTester()
txs := makeTransactions(tester.sender, txAnnounceLimit)
txFetcherFn1 := tester.makeTxFetcher("peer1", txs)
txFetcherFn2 := tester.makeTxFetcher("peer2", txs)
var (
count uint32
done = make(chan struct{})
)
tester.fetcher.importTxsHook = func(transactions []*types.Transaction) {
atomic.AddUint32(&count, uint32(len(transactions)))
if atomic.LoadUint32(&count) >= uint32(txAnnounceLimit) {
done <- struct{}{}
}
}
for _, tx := range txs {
tester.fetcher.Notify("peer1", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn1)
tester.fetcher.Notify("peer2", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout+time.Millisecond), txFetcherFn2)
tester.fetcher.Notify("peer2", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout-time.Millisecond), txFetcherFn2)
}
select {
case <-done:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
func TestBatchAnnouncements(t *testing.T) {
tester := newTxFetcherTester()
txs := makeTransactions(tester.sender, txAnnounceLimit)
retrieveTxs := tester.makeTxFetcher("peer", txs)
var count uint32
var done = make(chan struct{})
tester.fetcher.importTxsHook = func(txs []*types.Transaction) {
atomic.AddUint32(&count, uint32(len(txs)))
if atomic.LoadUint32(&count) >= uint32(txAnnounceLimit) {
done <- struct{}{}
}
}
// Send all announces which exceeds the limit.
var hashes []common.Hash
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}
tester.fetcher.Notify("peer", hashes, time.Now(), retrieveTxs)
select {
case <-done:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
func TestPropagationAfterAnnounce(t *testing.T) {
tester := newTxFetcherTester()
txs := makeTransactions(tester.sender, txAnnounceLimit)
var cleaned = make(chan struct{})
tester.fetcher.cleanupHook = func(hashes []common.Hash) {
cleaned <- struct{}{}
}
retrieveTxs := tester.makeTxFetcher("peer", txs)
for _, tx := range txs {
tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now(), retrieveTxs)
tester.fetcher.EnqueueTxs("peer", []*types.Transaction{tx})
// It's ok to read the map directly since no write
// will happen in the same time.
<-cleaned
if len(tester.fetcher.announced) != 0 {
t.Fatalf("Announcement should be cleaned, got %d", len(tester.fetcher.announced))
}
}
}
func TestEnqueueTransactions(t *testing.T) {
tester := newTxFetcherTester()
txs := makeTransactions(tester.sender, txAnnounceLimit)
done := make(chan struct{})
tester.fetcher.importTxsHook = func(transactions []*types.Transaction) {
if len(transactions) == txAnnounceLimit {
done <- struct{}{}
}
}
go tester.fetcher.EnqueueTxs("peer", txs)
select {
case <-done:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
func TestInvalidTxAnnounces(t *testing.T) {
tester := newTxFetcherTester()
var txs []*types.Transaction
txs = append(txs, makeUnsignedTransactions(tester.sender, 1)...)
txs = append(txs, makeTransactions(tester.sender, 1)...)
txFetcherFn := tester.makeTxFetcher("peer", txs)
dropped := make(chan string, 1)
tester.fetcher.dropHook = func(s string) { dropped <- s }
for _, tx := range txs {
tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now(), txFetcherFn)
}
select {
case s := <-dropped:
if s != "peer" {
t.Fatalf("invalid dropped peer")
}
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
func TestRejectUnderpriced(t *testing.T) {
tester := newTxFetcherTester()
tester.priceLimit = big.NewInt(10000)
done := make(chan struct{})
tester.fetcher.importTxsHook = func([]*types.Transaction) { done <- struct{}{} }
reject := make(chan struct{})
tester.fetcher.rejectUnderprice = func(common.Hash) { reject <- struct{}{} }
tx := types.NewTransaction(0, common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(100)), 100, big.NewInt(int64(100)), nil)
tx, _ = types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1)), tester.sender)
txFetcherFn := tester.makeTxFetcher("peer", []*types.Transaction{tx})
// Send the announcement first time
tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn)
<-done
// Resend the announcement, shouldn't schedule fetching this time
tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn)
select {
case <-reject:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
......@@ -77,9 +77,10 @@ type ProtocolManager struct {
blockchain *core.BlockChain
maxPeers int
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
downloader *downloader.Downloader
blockFetcher *fetcher.BlockFetcher
txFetcher *fetcher.TxFetcher
peers *peerSet
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
......@@ -97,6 +98,9 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
// Test fields or hooks
broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
......@@ -187,7 +191,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
}
return n, err
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
manager.blockFetcher = fetcher.NewBlockFetcher(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, manager.removePeer)
return manager, nil
}
......@@ -203,7 +208,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
Version: version,
Length: length,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := pm.newPeer(int(version), p, rw)
peer := pm.newPeer(int(version), p, rw, pm.txpool.Get)
select {
case pm.newPeerCh <- peer:
pm.wg.Add(1)
......@@ -286,8 +291,8 @@ func (pm *ProtocolManager) Stop() {
log.Info("Ethereum protocol stopped")
}
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw), getPooledTx)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
......@@ -514,7 +519,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want)
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
headers = pm.blockFetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
......@@ -567,7 +572,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Filter out any explicitly requested bodies, deliver the rest to the downloader
filter := len(transactions) > 0 || len(uncles) > 0
if filter {
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
transactions, uncles = pm.blockFetcher.FilterBodies(p.id, transactions, uncles, time.Now())
}
if len(transactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
......@@ -678,7 +683,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, block := range unknown {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
pm.blockFetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
case msg.Code == NewBlockMsg:
......@@ -703,7 +708,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)
pm.blockFetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
......@@ -724,6 +729,66 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65:
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
}
var hashes []common.Hash
if err := msg.Decode(&hashes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Schedule all the unknown hashes for retrieval
var unknown []common.Hash
for _, hash := range hashes {
// Mark the hashes as present at the remote node
p.MarkTransaction(hash)
// Filter duplicated transaction announcement.
// Notably we only dedupliate announcement in txpool, check the rationale
// behind in EIP https://github.com/ethereum/EIPs/pull/2464.
if pm.txpool.Has(hash) {
continue
}
unknown = append(unknown, hash)
}
pm.txFetcher.Notify(p.id, unknown, time.Now(), p.AsyncRequestTxs)
case msg.Code == GetPooledTransactionsMsg && p.version >= eth65:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather transactions until the fetch or network limits is reached
var (
hash common.Hash
bytes int
txs []rlp.RawValue
)
for bytes < softResponseLimit {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested transaction, skipping if unknown to us
tx := pm.txpool.Get(hash)
if tx == nil {
continue
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(tx); err != nil {
log.Error("Failed to encode transaction", "err", err)
} else {
txs = append(txs, encoded)
bytes += len(encoded)
}
}
return p.SendTransactionRLP(txs)
case msg.Code == TxMsg:
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
......@@ -741,7 +806,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkTransaction(tx.Hash())
}
pm.txpool.AddRemotes(txs)
pm.txFetcher.EnqueueTxs(p.id, txs)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
......@@ -791,20 +856,48 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
var txset = make(map[*peer]types.Transactions)
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions, propagate bool) {
var (
txset = make(map[*peer][]common.Hash)
annos = make(map[*peer][]common.Hash)
)
// Broadcast transactions to a batch of peers not knowing about it
if propagate {
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
// Send the block to a subset of our peers
transferLen := int(math.Sqrt(float64(len(peers))))
if transferLen < minBroadcastPeers {
transferLen = minBroadcastPeers
}
if transferLen > len(peers) {
transferLen = len(peers)
}
transfer := peers[:transferLen]
for _, peer := range transfer {
txset[peer] = append(txset[peer], tx.Hash())
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
for peer, hashes := range txset {
peer.AsyncSendTransactions(hashes)
}
return
}
// Otherwise only broadcast the announcement to peers
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
for _, peer := range peers {
txset[peer] = append(txset[peer], tx)
annos[peer] = append(annos[peer], tx.Hash())
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
for peer, hashes := range annos {
if peer.version >= eth65 {
peer.AsyncSendTransactionHashes(hashes)
} else {
peer.AsyncSendTransactions(hashes)
}
}
}
......@@ -823,7 +916,13 @@ func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-pm.txsCh:
pm.BroadcastTxs(event.Txs)
// For testing purpose only, disable propagation
if pm.broadcastTxAnnouncesOnly {
pm.BroadcastTxs(event.Txs, false)
continue
}
pm.BroadcastTxs(event.Txs, true) // First propagate transactions to peers
pm.BroadcastTxs(event.Txs, false) // Only then announce to the rest
// Err() channel will be closed when unsubscribing.
case <-pm.txsSub.Err():
......
......@@ -495,7 +495,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil)
pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
......@@ -582,7 +582,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil)
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
......
......@@ -68,7 +68,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil)
pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil)
if err != nil {
return nil, nil, err
}
......@@ -91,22 +91,43 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
// testTxPool is a fake, helper transaction pool for testing purposes
type testTxPool struct {
txFeed event.Feed
pool []*types.Transaction // Collection of all transactions
added chan<- []*types.Transaction // Notification channel for new transactions
pool map[common.Hash]*types.Transaction // Hash map of collected transactions
added chan<- []*types.Transaction // Notification channel for new transactions
lock sync.RWMutex // Protects the transaction pool
}
// Has returns an indicator whether txpool has a transaction
// cached with the given hash.
func (p *testTxPool) Has(hash common.Hash) bool {
p.lock.Lock()
defer p.lock.Unlock()
return p.pool[hash] != nil
}
// Get retrieves the transaction from local txpool with given
// tx hash.
func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
p.lock.Lock()
defer p.lock.Unlock()
return p.pool[hash]
}
// AddRemotes appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
p.lock.Lock()
defer p.lock.Unlock()
p.pool = append(p.pool, txs...)
for _, tx := range txs {
p.pool[tx.Hash()] = tx
}
if p.added != nil {
p.added <- txs
}
p.txFeed.Send(core.NewTxsEvent{Txs: txs})
return make([]error, len(txs))
}
......@@ -153,7 +174,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
var id enode.ID
rand.Read(id[:])
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get)
// Start the peer on a new thread
errc := make(chan error, 1)
......@@ -191,7 +212,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, genesi
CurrentBlock: head,
GenesisBlock: genesis,
}
case p.version == eth64:
case p.version >= eth64:
msg = &statusData{
ProtocolVersion: uint32(p.version),
NetworkID: DefaultConfig.NetworkId,
......
This diff is collapsed.
......@@ -33,16 +33,17 @@ import (
const (
eth63 = 63
eth64 = 64
eth65 = 65
)
// protocolName is the official short name of the protocol used during capability negotiation.
const protocolName = "eth"
// ProtocolVersions are the supported versions of the eth protocol (first is primary).
var ProtocolVersions = []uint{eth64, eth63}
var ProtocolVersions = []uint{eth65, eth64, eth63}
// protocolLengths are the number of implemented message corresponding to different protocol versions.
var protocolLengths = map[uint]uint64{eth64: 17, eth63: 17}
var protocolLengths = map[uint]uint64{eth65: 17, eth64: 17, eth63: 17}
const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
......@@ -60,6 +61,13 @@ const (
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
// New protocol message codes introduced in eth65
//
// Previously these message ids(0x08, 0x09) were used by some
// legacy and unsupported eth protocols, reown them here.
NewPooledTransactionHashesMsg = 0x08
GetPooledTransactionsMsg = 0x09
)
type errCode int
......@@ -94,6 +102,14 @@ var errorToString = map[int]string{
}
type txPool interface {
// Has returns an indicator whether txpool has a transaction
// cached with the given hash.
Has(hash common.Hash) bool
// Get retrieves the transaction from local txpool with given
// tx hash.
Get(hash common.Hash) *types.Transaction
// AddRemotes should add the given transactions to the pool.
AddRemotes([]*types.Transaction) []error
......
......@@ -20,6 +20,7 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"testing"
"time"
......@@ -180,16 +181,16 @@ func TestForkIDSplit(t *testing.T) {
blocksNoFork, _ = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil)
blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil)
ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainNoFork, dbNoFork, 1, nil)
ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainProFork, dbProFork, 1, nil)
ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil)
ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil)
)
ethNoFork.Start(1000)
ethProFork.Start(1000)
// Both nodes should allow the other to connect (same genesis, next fork is the same)
p2pNoFork, p2pProFork := p2p.MsgPipe()
peerNoFork := newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork)
peerProFork := newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork)
peerNoFork := newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
peerProFork := newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
errc := make(chan error, 2)
go func() { errc <- ethNoFork.handle(peerProFork) }()
......@@ -207,8 +208,8 @@ func TestForkIDSplit(t *testing.T) {
chainProFork.InsertChain(blocksProFork[:1])
p2pNoFork, p2pProFork = p2p.MsgPipe()
peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork)
peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork)
peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
errc = make(chan error, 2)
go func() { errc <- ethNoFork.handle(peerProFork) }()
......@@ -226,8 +227,8 @@ func TestForkIDSplit(t *testing.T) {
chainProFork.InsertChain(blocksProFork[1:2])
p2pNoFork, p2pProFork = p2p.MsgPipe()
peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork)
peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork)
peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
errc = make(chan error, 2)
go func() { errc <- ethNoFork.handle(peerProFork) }()
......@@ -246,6 +247,7 @@ func TestForkIDSplit(t *testing.T) {
// This test checks that received transactions are added to the local pool.
func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) }
func TestRecvTransactions64(t *testing.T) { testRecvTransactions(t, 64) }
func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, 65) }
func testRecvTransactions(t *testing.T, protocol int) {
txAdded := make(chan []*types.Transaction)
......@@ -274,6 +276,7 @@ func testRecvTransactions(t *testing.T, protocol int) {
// This test checks that pending transactions are sent.
func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) }
func TestSendTransactions64(t *testing.T) { testSendTransactions(t, 64) }
func TestSendTransactions65(t *testing.T) { testSendTransactions(t, 65) }
func testSendTransactions(t *testing.T, protocol int) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
......@@ -298,17 +301,43 @@ func testSendTransactions(t *testing.T, protocol int) {
}
for n := 0; n < len(alltxs) && !t.Failed(); {
var txs []*types.Transaction
msg, err := p.app.ReadMsg()
if err != nil {
t.Errorf("%v: read error: %v", p.Peer, err)
} else if msg.Code != TxMsg {
t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
}
if err := msg.Decode(&txs); err != nil {
t.Errorf("%v: %v", p.Peer, err)
var hashes []common.Hash
var forAllHashes func(callback func(hash common.Hash))
switch protocol {
case 63:
fallthrough
case 64:
msg, err := p.app.ReadMsg()
if err != nil {
t.Errorf("%v: read error: %v", p.Peer, err)
} else if msg.Code != TxMsg {
t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
}
if err := msg.Decode(&txs); err != nil {
t.Errorf("%v: %v", p.Peer, err)
}
forAllHashes = func(callback func(hash common.Hash)) {
for _, tx := range txs {
callback(tx.Hash())
}
}
case 65:
msg, err := p.app.ReadMsg()
if err != nil {
t.Errorf("%v: read error: %v", p.Peer, err)
} else if msg.Code != NewPooledTransactionHashesMsg {
t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
}
if err := msg.Decode(&hashes); err != nil {
t.Errorf("%v: %v", p.Peer, err)
}
forAllHashes = func(callback func(hash common.Hash)) {
for _, h := range hashes {
callback(h)
}
}
}
for _, tx := range txs {
hash := tx.Hash()
forAllHashes(func(hash common.Hash) {
seentx, want := seen[hash]
if seentx {
t.Errorf("%v: got tx more than once: %x", p.Peer, hash)
......@@ -318,7 +347,7 @@ func testSendTransactions(t *testing.T, protocol int) {
}
seen[hash] = true
n++
}
})
}
}
for i := 0; i < 3; i++ {
......@@ -329,6 +358,53 @@ func testSendTransactions(t *testing.T, protocol int) {
wg.Wait()
}
func TestTransactionPropagation(t *testing.T) { testSyncTransaction(t, true) }
func TestTransactionAnnouncement(t *testing.T) { testSyncTransaction(t, false) }
func testSyncTransaction(t *testing.T, propagtion bool) {
// Create a protocol manager for transaction fetcher and sender
pmFetcher, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil)
defer pmFetcher.Stop()
pmSender, _ := newTestProtocolManagerMust(t, downloader.FastSync, 1024, nil, nil)
pmSender.broadcastTxAnnouncesOnly = !propagtion
defer pmSender.Stop()
// Sync up the two peers
io1, io2 := p2p.MsgPipe()
go pmSender.handle(pmSender.newPeer(65, p2p.NewPeer(enode.ID{}, "sender", nil), io2, pmSender.txpool.Get))
go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get))
time.Sleep(250 * time.Millisecond)
pmFetcher.synchronise(pmFetcher.peers.BestPeer())
atomic.StoreUint32(&pmFetcher.acceptTxs, 1)
newTxs := make(chan core.NewTxsEvent, 1024)
sub := pmFetcher.txpool.SubscribeNewTxsEvent(newTxs)
defer sub.Unsubscribe()
// Fill the pool with new transactions
alltxs := make([]*types.Transaction, 1024)
for nonce := range alltxs {
alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), 0)
}
pmSender.txpool.AddRemotes(alltxs)
var got int
loop:
for {
select {
case ev := <-newTxs:
got += len(ev.Txs)
if got == 1024 {
break loop
}
case <-time.NewTimer(time.Second).C:
t.Fatal("Failed to retrieve all transaction")
}
}
}
// Tests that the custom union field encoder and decoder works correctly.
func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
// Create a "random" hash for testing
......
......@@ -38,8 +38,9 @@ const (
)
type txsync struct {
p *peer
txs []*types.Transaction
p *peer
hashes []common.Hash
txs []*types.Transaction
}
// syncTransactions starts sending all currently pending transactions to the given peer.
......@@ -53,7 +54,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
return
}
select {
case pm.txsyncCh <- &txsync{p, txs}:
case pm.txsyncCh <- &txsync{p: p, txs: txs}:
case <-pm.quitSync:
}
}
......@@ -69,26 +70,46 @@ func (pm *ProtocolManager) txsyncLoop() {
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.hashes = pack.hashes[:0]
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
if s.p.version >= eth65 {
// Eth65 introduces transaction announcement https://github.com/ethereum/EIPs/pull/2464,
// only txhashes are transferred here.
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.hashes = append(pack.hashes, s.txs[i].Hash())
size += common.HashLength
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.hashes):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transaction announcements", "count", len(pack.hashes), "bytes", size)
sending = true
go func() { done <- pack.p.SendNewTransactionHashes(pack.hashes) }()
} else {
// Legacy eth protocol doesn't have transaction announcement protocol
// message, transfer the whole pending transaction slice.
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendNewTransactions(pack.txs) }()
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
// pick chooses the next pending sync.
......@@ -133,8 +154,10 @@ func (pm *ProtocolManager) txsyncLoop() {
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.fetcher.Start()
defer pm.fetcher.Stop()
pm.blockFetcher.Start()
pm.txFetcher.Start()
defer pm.blockFetcher.Stop()
defer pm.txFetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
......
......@@ -26,9 +26,13 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
)
func TestFastSyncDisabling63(t *testing.T) { testFastSyncDisabling(t, 63) }
func TestFastSyncDisabling64(t *testing.T) { testFastSyncDisabling(t, 64) }
func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, 65) }
// Tests that fast sync gets disabled as soon as a real block is successfully
// imported into the blockchain.
func TestFastSyncDisabling(t *testing.T) {
func testFastSyncDisabling(t *testing.T, protocol int) {
// Create a pristine protocol manager, check that fast sync is left enabled
pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil)
if atomic.LoadUint32(&pmEmpty.fastSync) == 0 {
......@@ -42,8 +46,8 @@ func TestFastSyncDisabling(t *testing.T) {
// Sync up the two peers
io1, io2 := p2p.MsgPipe()
go pmFull.handle(pmFull.newPeer(63, p2p.NewPeer(enode.ID{}, "empty", nil), io2))
go pmEmpty.handle(pmEmpty.newPeer(63, p2p.NewPeer(enode.ID{}, "full", nil), io1))
go pmFull.handle(pmFull.newPeer(protocol, p2p.NewPeer(enode.ID{}, "empty", nil), io2, pmFull.txpool.Get))
go pmEmpty.handle(pmEmpty.newPeer(protocol, p2p.NewPeer(enode.ID{}, "full", nil), io1, pmEmpty.txpool.Get))
time.Sleep(250 * time.Millisecond)
pmEmpty.synchronise(pmEmpty.peers.BestPeer())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment