Unverified Commit 88c69624 authored by gary rong's avatar gary rong Committed by GitHub

core/txpool: remove "local" notion from the txpool price heap (#21478)

* core: separate the local notion from the pricedHeap

* core: add benchmarks

* core: improve tests

* core: address comments

* core: degrade the panic to error message

* core: fix typo

* core: address comments

* core: address comment

* core: use PEAK instead of POP

* core: address comments
parent b47f4ca5
...@@ -24,7 +24,6 @@ import ( ...@@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
// nonceHeap is a heap.Interface implementation over 64bit unsigned integers for // nonceHeap is a heap.Interface implementation over 64bit unsigned integers for
...@@ -439,24 +438,29 @@ func (h *priceHeap) Pop() interface{} { ...@@ -439,24 +438,29 @@ func (h *priceHeap) Pop() interface{} {
} }
// txPricedList is a price-sorted heap to allow operating on transactions pool // txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way. // contents in a price-incrementing way. It's built opon the all transactions
// in txpool but only interested in the remote part. It means only remote transactions
// will be considered for tracking, sorting, eviction, etc.
type txPricedList struct { type txPricedList struct {
all *txLookup // Pointer to the map of all transactions all *txLookup // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions remotes *priceHeap // Heap of prices of all the stored **remote** transactions
stales int // Number of stale price points to (re-heap trigger) stales int // Number of stale price points to (re-heap trigger)
} }
// newTxPricedList creates a new price-sorted transaction heap. // newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *txLookup) *txPricedList { func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{ return &txPricedList{
all: all, all: all,
items: new(priceHeap), remotes: new(priceHeap),
} }
} }
// Put inserts a new transaction into the heap. // Put inserts a new transaction into the heap.
func (l *txPricedList) Put(tx *types.Transaction) { func (l *txPricedList) Put(tx *types.Transaction, local bool) {
heap.Push(l.items, tx) if local {
return
}
heap.Push(l.remotes, tx)
} }
// Removed notifies the prices transaction list that an old transaction dropped // Removed notifies the prices transaction list that an old transaction dropped
...@@ -465,121 +469,95 @@ func (l *txPricedList) Put(tx *types.Transaction) { ...@@ -465,121 +469,95 @@ func (l *txPricedList) Put(tx *types.Transaction) {
func (l *txPricedList) Removed(count int) { func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%) // Bump the stale counter, but exit if still too low (< 25%)
l.stales += count l.stales += count
if l.stales <= len(*l.items)/4 { if l.stales <= len(*l.remotes)/4 {
return return
} }
// Seems we've reached a critical number of stale transactions, reheap // Seems we've reached a critical number of stale transactions, reheap
reheap := make(priceHeap, 0, l.all.Count()) l.Reheap()
l.stales, l.items = 0, &reheap
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
*l.items = append(*l.items, tx)
return true
})
heap.Init(l.items)
} }
// Cap finds all the transactions below the given price threshold, drops them // Cap finds all the transactions below the given price threshold, drops them
// from the priced list and returns them for further removal from the entire pool. // from the priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions { //
// Note: only remote transactions will be considered for eviction.
func (l *txPricedList) Cap(threshold *big.Int) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep for len(*l.remotes) > 0 {
for len(*l.items) > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) cheapest := (*l.remotes)[0]
if l.all.Get(tx.Hash()) == nil { if l.all.GetRemote(cheapest.Hash()) == nil { // Removed or migrated
heap.Pop(l.remotes)
l.stales-- l.stales--
continue continue
} }
// Stop the discards if we've reached the threshold // Stop the discards if we've reached the threshold
if tx.GasPriceIntCmp(threshold) >= 0 { if cheapest.GasPriceIntCmp(threshold) >= 0 {
save = append(save, tx)
break break
} }
// Non stale transaction found, discard unless local heap.Pop(l.remotes)
if local.containsTx(tx) { drop = append(drop, cheapest)
save = append(save, tx)
} else {
drop = append(drop, tx)
}
}
for _, tx := range save {
heap.Push(l.items, tx)
} }
return drop return drop
} }
// Underpriced checks whether a transaction is cheaper than (or as cheap as) the // Underpriced checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced transaction currently being tracked. // lowest priced (remote) transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool { func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
// Local transactions cannot be underpriced
if local.containsTx(tx) {
return false
}
// Discard stale price points if found at the heap start // Discard stale price points if found at the heap start
for len(*l.items) > 0 { for len(*l.remotes) > 0 {
head := []*types.Transaction(*l.items)[0] head := []*types.Transaction(*l.remotes)[0]
if l.all.Get(head.Hash()) == nil { if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
l.stales-- l.stales--
heap.Pop(l.items) heap.Pop(l.remotes)
continue continue
} }
break break
} }
// Check if the transaction is underpriced or not // Check if the transaction is underpriced or not
if len(*l.items) == 0 { if len(*l.remotes) == 0 {
log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors return false // There is no remote transaction at all.
return false
} }
cheapest := []*types.Transaction(*l.items)[0] // If the remote transaction is even cheaper than the
// cheapest one tracked locally, reject it.
cheapest := []*types.Transaction(*l.remotes)[0]
return cheapest.GasPriceCmp(tx) >= 0 return cheapest.GasPriceCmp(tx) >= 0
} }
// Discard finds a number of most underpriced transactions, removes them from the // Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool. // priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions { //
// If we have some local accountset, those will not be discarded // Note local transaction won't be considered for eviction.
if !local.empty() { func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
// In case the list is filled to the brim with 'local' txs, we do this drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
// little check to avoid unpacking / repacking the heap later on, which for len(*l.remotes) > 0 && slots > 0 {
// is very expensive
discardable := 0
for _, tx := range *l.items {
if !local.containsTx(tx) {
discardable++
}
if discardable >= slots {
break
}
}
if slots > discardable {
slots = discardable
}
}
if slots == 0 {
return nil
}
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep
for len(*l.items) > 0 && slots > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.remotes).(*types.Transaction)
if l.all.Get(tx.Hash()) == nil { if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales-- l.stales--
continue continue
} }
// Non stale transaction found, discard unless local // Non stale transaction found, discard it
if local.containsTx(tx) { drop = append(drop, tx)
save = append(save, tx) slots -= numSlots(tx)
} else { }
drop = append(drop, tx) // If we still can't make enough room for the new transaction
slots -= numSlots(tx) if slots > 0 && !force {
for _, tx := range drop {
heap.Push(l.remotes, tx)
} }
return nil, false
} }
for _, tx := range save { return drop, true
heap.Push(l.items, tx) }
}
return drop // Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *txPricedList) Reheap() {
reheap := make(priceHeap, 0, l.all.RemoteCount())
l.stales, l.remotes = 0, &reheap
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
*l.remotes = append(*l.remotes, tx)
return true
}, false, true) // Only iterate remotes
heap.Init(l.remotes)
} }
This diff is collapsed.
...@@ -107,10 +107,11 @@ func validateTxPoolInternals(pool *TxPool) error { ...@@ -107,10 +107,11 @@ func validateTxPoolInternals(pool *TxPool) error {
if total := pool.all.Count(); total != pending+queued { if total := pool.all.Count(); total != pending+queued {
return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
} }
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued { pool.priced.Reheap()
return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued) priced, remote := pool.priced.remotes.Len(), pool.all.RemoteCount()
if priced != remote {
return fmt.Errorf("total priced transaction count %d != %d", priced, remote)
} }
// Ensure the next nonce to assign is the correct one // Ensure the next nonce to assign is the correct one
for addr, txs := range pool.pending { for addr, txs := range pool.pending {
// Find the last transaction // Find the last transaction
...@@ -280,7 +281,7 @@ func TestTransactionQueue(t *testing.T) { ...@@ -280,7 +281,7 @@ func TestTransactionQueue(t *testing.T) {
pool.currentState.AddBalance(from, big.NewInt(1000)) pool.currentState.AddBalance(from, big.NewInt(1000))
<-pool.requestReset(nil, nil) <-pool.requestReset(nil, nil)
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx, false, true)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending)) t.Error("expected valid txs to be 1 is", len(pool.pending))
...@@ -289,7 +290,7 @@ func TestTransactionQueue(t *testing.T) { ...@@ -289,7 +290,7 @@ func TestTransactionQueue(t *testing.T) {
tx = transaction(1, 100, key) tx = transaction(1, 100, key)
from, _ = deriveSender(tx) from, _ = deriveSender(tx)
pool.currentState.SetNonce(from, 2) pool.currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx, false, true)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
...@@ -313,9 +314,9 @@ func TestTransactionQueue2(t *testing.T) { ...@@ -313,9 +314,9 @@ func TestTransactionQueue2(t *testing.T) {
pool.currentState.AddBalance(from, big.NewInt(1000)) pool.currentState.AddBalance(from, big.NewInt(1000))
pool.reset(nil, nil) pool.reset(nil, nil)
pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx1.Hash(), tx1, false, true)
pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx2.Hash(), tx2, false, true)
pool.enqueueTx(tx3.Hash(), tx3) pool.enqueueTx(tx3.Hash(), tx3, false, true)
pool.promoteExecutables([]common.Address{from}) pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 { if len(pool.pending) != 1 {
...@@ -488,12 +489,21 @@ func TestTransactionDropping(t *testing.T) { ...@@ -488,12 +489,21 @@ func TestTransactionDropping(t *testing.T) {
tx11 = transaction(11, 200, key) tx11 = transaction(11, 200, key)
tx12 = transaction(12, 300, key) tx12 = transaction(12, 300, key)
) )
pool.all.Add(tx0, false)
pool.priced.Put(tx0, false)
pool.promoteTx(account, tx0.Hash(), tx0) pool.promoteTx(account, tx0.Hash(), tx0)
pool.all.Add(tx1, false)
pool.priced.Put(tx1, false)
pool.promoteTx(account, tx1.Hash(), tx1) pool.promoteTx(account, tx1.Hash(), tx1)
pool.all.Add(tx2, false)
pool.priced.Put(tx2, false)
pool.promoteTx(account, tx2.Hash(), tx2) pool.promoteTx(account, tx2.Hash(), tx2)
pool.enqueueTx(tx10.Hash(), tx10)
pool.enqueueTx(tx11.Hash(), tx11) pool.enqueueTx(tx10.Hash(), tx10, false, true)
pool.enqueueTx(tx12.Hash(), tx12) pool.enqueueTx(tx11.Hash(), tx11, false, true)
pool.enqueueTx(tx12.Hash(), tx12, false, true)
// Check that pre and post validations leave the pool as is // Check that pre and post validations leave the pool as is
if pool.pending[account].Len() != 3 { if pool.pending[account].Len() != 3 {
...@@ -1964,7 +1974,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { ...@@ -1964,7 +1974,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
tx := transaction(uint64(1+i), 100000, key) tx := transaction(uint64(1+i), 100000, key)
pool.enqueueTx(tx.Hash(), tx) pool.enqueueTx(tx.Hash(), tx, false, true)
} }
// Benchmark the speed of pool validation // Benchmark the speed of pool validation
b.ResetTimer() b.ResetTimer()
...@@ -2007,3 +2017,38 @@ func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) { ...@@ -2007,3 +2017,38 @@ func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
} }
} }
} }
func BenchmarkInsertRemoteWithAllLocals(b *testing.B) {
// Allocate keys for testing
key, _ := crypto.GenerateKey()
account := crypto.PubkeyToAddress(key.PublicKey)
remoteKey, _ := crypto.GenerateKey()
remoteAddr := crypto.PubkeyToAddress(remoteKey.PublicKey)
locals := make([]*types.Transaction, 4096+1024) // Occupy all slots
for i := 0; i < len(locals); i++ {
locals[i] = transaction(uint64(i), 100000, key)
}
remotes := make([]*types.Transaction, 1000)
for i := 0; i < len(remotes); i++ {
remotes[i] = pricedTransaction(uint64(i), 100000, big.NewInt(2), remoteKey) // Higher gasprice
}
// Benchmark importing the transactions into the queue
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
pool, _ := setupTxPool()
pool.currentState.AddBalance(account, big.NewInt(100000000))
for _, local := range locals {
pool.AddLocal(local)
}
b.StartTimer()
// Assign a high enough balance for testing
pool.currentState.AddBalance(remoteAddr, big.NewInt(100000000))
for i := 0; i < len(remotes); i++ {
pool.AddRemotes([]*types.Transaction{remotes[i]})
}
pool.Stop()
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment