core, eth, les: polish txpool API around local/remote txs

parent a633a2d7
...@@ -420,7 +420,7 @@ func (l *txPricedList) Removed() { ...@@ -420,7 +420,7 @@ func (l *txPricedList) Removed() {
heap.Init(l.items) heap.Init(l.items)
} }
// Discard finds all the transactions below the given price threshold, drops them // Cap finds all the transactions below the given price threshold, drops them
// from the priced list and returs them for further removal from the entire pool. // from the priced list and returs them for further removal from the entire pool.
func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions { func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
...@@ -429,9 +429,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact ...@@ -429,9 +429,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
for len(*l.items) > 0 { for len(*l.items) > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok {
hash := tx.Hash()
if _, ok := (*l.all)[hash]; !ok {
l.stales-- l.stales--
continue continue
} }
...@@ -440,7 +438,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact ...@@ -440,7 +438,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
break break
} }
// Non stale transaction found, discard unless local // Non stale transaction found, discard unless local
if local.contains(tx) { if local.containsTx(tx) {
save = append(save, tx) save = append(save, tx)
} else { } else {
drop = append(drop, tx) drop = append(drop, tx)
...@@ -456,7 +454,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact ...@@ -456,7 +454,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
// lowest priced transaction currently being tracked. // lowest priced transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool { func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool {
// Local transactions cannot be underpriced // Local transactions cannot be underpriced
if local.contains(tx) { if local.containsTx(tx) {
return false return false
} }
// Discard stale price points if found at the heap start // Discard stale price points if found at the heap start
...@@ -487,14 +485,12 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions ...@@ -487,14 +485,12 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
for len(*l.items) > 0 && count > 0 { for len(*l.items) > 0 && count > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok {
hash := tx.Hash()
if _, ok := (*l.all)[hash]; !ok {
l.stales-- l.stales--
continue continue
} }
// Non stale transaction found, discard unless local // Non stale transaction found, discard unless local
if local.contains(tx) { if local.containsTx(tx) {
save = append(save, tx) save = append(save, tx)
} else { } else {
drop = append(drop, tx) drop = append(drop, tx)
......
This diff is collapsed.
This diff is collapsed.
...@@ -119,8 +119,7 @@ func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) ...@@ -119,8 +119,7 @@ func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
b.eth.txMu.Lock() b.eth.txMu.Lock()
defer b.eth.txMu.Unlock() defer b.eth.txMu.Unlock()
b.eth.txPool.SetLocal(signedTx) return b.eth.txPool.AddLocal(signedTx)
return b.eth.txPool.Add(signedTx)
} }
func (b *EthApiBackend) RemoveTx(txHash common.Hash) { func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
......
...@@ -661,7 +661,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -661,7 +661,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
p.MarkTransaction(tx.Hash()) p.MarkTransaction(tx.Hash())
} }
pm.txpool.AddBatch(txs) pm.txpool.AddRemotes(txs)
default: default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code) return errResp(ErrInvalidMsgCode, "%v", msg.Code)
......
...@@ -94,9 +94,9 @@ type testTxPool struct { ...@@ -94,9 +94,9 @@ type testTxPool struct {
lock sync.RWMutex // Protects the transaction pool lock sync.RWMutex // Protects the transaction pool
} }
// AddBatch appends a batch of transactions to the pool, and notifies any // AddRemotes appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil // listeners if the addition channel is non nil
func (p *testTxPool) AddBatch(txs []*types.Transaction) error { func (p *testTxPool) AddRemotes(txs []*types.Transaction) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
......
...@@ -94,8 +94,8 @@ var errorToString = map[int]string{ ...@@ -94,8 +94,8 @@ var errorToString = map[int]string{
} }
type txPool interface { type txPool interface {
// AddBatch should add the given transactions to the pool. // AddRemotes should add the given transactions to the pool.
AddBatch([]*types.Transaction) error AddRemotes([]*types.Transaction) error
// Pending should return pending transactions. // Pending should return pending transactions.
// The slice should be modifiable by the caller. // The slice should be modifiable by the caller.
......
...@@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) { ...@@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) {
for nonce := range alltxs { for nonce := range alltxs {
alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize) alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize)
} }
pm.txpool.AddBatch(alltxs) pm.txpool.AddRemotes(alltxs)
// Connect several peers. They should all receive the pending transactions. // Connect several peers. They should all receive the pending transactions.
var wg sync.WaitGroup var wg sync.WaitGroup
......
...@@ -87,8 +87,8 @@ type BlockChain interface { ...@@ -87,8 +87,8 @@ type BlockChain interface {
} }
type txPool interface { type txPool interface {
// AddTransactions should add the given transactions to the pool. // AddRemotes should add the given transactions to the pool.
AddBatch([]*types.Transaction) error AddRemotes([]*types.Transaction) error
} }
type ProtocolManager struct { type ProtocolManager struct {
...@@ -803,7 +803,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -803,7 +803,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "") return errResp(ErrRequestRejected, "")
} }
if err := pm.txpool.AddBatch(txs); err != nil { if err := pm.txpool.AddRemotes(txs); err != nil {
return errResp(ErrUnexpectedResponse, "msg: %v", err) return errResp(ErrUnexpectedResponse, "msg: %v", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment