Commit 55b579e0 authored by Ryan Schneider's avatar Ryan Schneider Committed by Péter Szilágyi

core: use a wrapped map to remove contention in `TxPool.Get`. (#16670)

* core: use a wrapped `map` and `sync.RWMutex` for `TxPool.all` to remove contention in `TxPool.Get`.

* core: Remove redundant `txLookup.Find` and improve comments on txLookup methods.
parent be22ee8d
...@@ -397,13 +397,13 @@ func (h *priceHeap) Pop() interface{} { ...@@ -397,13 +397,13 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool // txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way. // contents in a price-incrementing way.
type txPricedList struct { type txPricedList struct {
all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions all *txLookup // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger) stales int // Number of stale price points to (re-heap trigger)
} }
// newTxPricedList creates a new price-sorted transaction heap. // newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList { func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{ return &txPricedList{
all: all, all: all,
items: new(priceHeap), items: new(priceHeap),
...@@ -425,12 +425,13 @@ func (l *txPricedList) Removed() { ...@@ -425,12 +425,13 @@ func (l *txPricedList) Removed() {
return return
} }
// Seems we've reached a critical number of stale transactions, reheap // Seems we've reached a critical number of stale transactions, reheap
reheap := make(priceHeap, 0, len(*l.all)) reheap := make(priceHeap, 0, l.all.Count())
l.stales, l.items = 0, &reheap l.stales, l.items = 0, &reheap
for _, tx := range *l.all { l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
*l.items = append(*l.items, tx) *l.items = append(*l.items, tx)
} return true
})
heap.Init(l.items) heap.Init(l.items)
} }
...@@ -443,7 +444,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact ...@@ -443,7 +444,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
for len(*l.items) > 0 { for len(*l.items) > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok { if l.all.Get(tx.Hash()) == nil {
l.stales-- l.stales--
continue continue
} }
...@@ -475,7 +476,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo ...@@ -475,7 +476,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
// Discard stale price points if found at the heap start // Discard stale price points if found at the heap start
for len(*l.items) > 0 { for len(*l.items) > 0 {
head := []*types.Transaction(*l.items)[0] head := []*types.Transaction(*l.items)[0]
if _, ok := (*l.all)[head.Hash()]; !ok { if l.all.Get(head.Hash()) == nil {
l.stales-- l.stales--
heap.Pop(l.items) heap.Pop(l.items)
continue continue
...@@ -500,7 +501,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions ...@@ -500,7 +501,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
for len(*l.items) > 0 && count > 0 { for len(*l.items) > 0 && count > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok { if l.all.Get(tx.Hash()) == nil {
l.stales-- l.stales--
continue continue
} }
......
This diff is collapsed.
...@@ -94,7 +94,7 @@ func validateTxPoolInternals(pool *TxPool) error { ...@@ -94,7 +94,7 @@ func validateTxPoolInternals(pool *TxPool) error {
// Ensure the total transaction set is consistent with pending + queued // Ensure the total transaction set is consistent with pending + queued
pending, queued := pool.stats() pending, queued := pool.stats()
if total := len(pool.all); total != pending+queued { if total := pool.all.Count(); total != pending+queued {
return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
} }
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued { if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
...@@ -401,8 +401,8 @@ func TestTransactionDoubleNonce(t *testing.T) { ...@@ -401,8 +401,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
} }
// Ensure the total transaction count is correct // Ensure the total transaction count is correct
if len(pool.all) != 1 { if pool.all.Count() != 1 {
t.Error("expected 1 total transactions, got", len(pool.all)) t.Error("expected 1 total transactions, got", pool.all.Count())
} }
} }
...@@ -424,8 +424,8 @@ func TestTransactionMissingNonce(t *testing.T) { ...@@ -424,8 +424,8 @@ func TestTransactionMissingNonce(t *testing.T) {
if pool.queue[addr].Len() != 1 { if pool.queue[addr].Len() != 1 {
t.Error("expected 1 queued transaction, got", pool.queue[addr].Len()) t.Error("expected 1 queued transaction, got", pool.queue[addr].Len())
} }
if len(pool.all) != 1 { if pool.all.Count() != 1 {
t.Error("expected 1 total transactions, got", len(pool.all)) t.Error("expected 1 total transactions, got", pool.all.Count())
} }
} }
...@@ -488,8 +488,8 @@ func TestTransactionDropping(t *testing.T) { ...@@ -488,8 +488,8 @@ func TestTransactionDropping(t *testing.T) {
if pool.queue[account].Len() != 3 { if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
} }
if len(pool.all) != 6 { if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
} }
pool.lockedReset(nil, nil) pool.lockedReset(nil, nil)
if pool.pending[account].Len() != 3 { if pool.pending[account].Len() != 3 {
...@@ -498,8 +498,8 @@ func TestTransactionDropping(t *testing.T) { ...@@ -498,8 +498,8 @@ func TestTransactionDropping(t *testing.T) {
if pool.queue[account].Len() != 3 { if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
} }
if len(pool.all) != 6 { if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
} }
// Reduce the balance of the account, and check that invalidated transactions are dropped // Reduce the balance of the account, and check that invalidated transactions are dropped
pool.currentState.AddBalance(account, big.NewInt(-650)) pool.currentState.AddBalance(account, big.NewInt(-650))
...@@ -523,8 +523,8 @@ func TestTransactionDropping(t *testing.T) { ...@@ -523,8 +523,8 @@ func TestTransactionDropping(t *testing.T) {
if _, ok := pool.queue[account].txs.items[tx12.Nonce()]; ok { if _, ok := pool.queue[account].txs.items[tx12.Nonce()]; ok {
t.Errorf("out-of-fund queued transaction present: %v", tx11) t.Errorf("out-of-fund queued transaction present: %v", tx11)
} }
if len(pool.all) != 4 { if pool.all.Count() != 4 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
} }
// Reduce the block gas limit, check that invalidated transactions are dropped // Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100 pool.chain.(*testBlockChain).gasLimit = 100
...@@ -542,8 +542,8 @@ func TestTransactionDropping(t *testing.T) { ...@@ -542,8 +542,8 @@ func TestTransactionDropping(t *testing.T) {
if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok { if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok {
t.Errorf("over-gased queued transaction present: %v", tx11) t.Errorf("over-gased queued transaction present: %v", tx11)
} }
if len(pool.all) != 2 { if pool.all.Count() != 2 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 2) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 2)
} }
} }
...@@ -596,8 +596,8 @@ func TestTransactionPostponing(t *testing.T) { ...@@ -596,8 +596,8 @@ func TestTransactionPostponing(t *testing.T) {
if len(pool.queue) != 0 { if len(pool.queue) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0) t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0)
} }
if len(pool.all) != len(txs) { if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txs)) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
} }
pool.lockedReset(nil, nil) pool.lockedReset(nil, nil)
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
...@@ -606,8 +606,8 @@ func TestTransactionPostponing(t *testing.T) { ...@@ -606,8 +606,8 @@ func TestTransactionPostponing(t *testing.T) {
if len(pool.queue) != 0 { if len(pool.queue) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0) t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0)
} }
if len(pool.all) != len(txs) { if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txs)) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
} }
// Reduce the balance of the account, and check that transactions are reorganised // Reduce the balance of the account, and check that transactions are reorganised
for _, addr := range accs { for _, addr := range accs {
...@@ -656,8 +656,8 @@ func TestTransactionPostponing(t *testing.T) { ...@@ -656,8 +656,8 @@ func TestTransactionPostponing(t *testing.T) {
} }
} }
} }
if len(pool.all) != len(txs)/2 { if pool.all.Count() != len(txs)/2 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txs)/2) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)/2)
} }
} }
...@@ -748,8 +748,8 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { ...@@ -748,8 +748,8 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
} }
} }
} }
if len(pool.all) != int(testTxPoolConfig.AccountQueue) { if pool.all.Count() != int(testTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), testTxPoolConfig.AccountQueue)
} }
} }
...@@ -942,8 +942,8 @@ func TestTransactionPendingLimiting(t *testing.T) { ...@@ -942,8 +942,8 @@ func TestTransactionPendingLimiting(t *testing.T) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0) t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0)
} }
} }
if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) { if pool.all.Count() != int(testTxPoolConfig.AccountQueue+5) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5) t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), testTxPoolConfig.AccountQueue+5)
} }
if err := validateEvents(events, int(testTxPoolConfig.AccountQueue+5)); err != nil { if err := validateEvents(events, int(testTxPoolConfig.AccountQueue+5)); err != nil {
t.Fatalf("event firing failed: %v", err) t.Fatalf("event firing failed: %v", err)
...@@ -993,8 +993,8 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { ...@@ -993,8 +993,8 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
if len(pool1.queue) != len(pool2.queue) { if len(pool1.queue) != len(pool2.queue) {
t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue)) t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue))
} }
if len(pool1.all) != len(pool2.all) { if pool1.all.Count() != pool2.all.Count() {
t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all)) t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", pool1.all.Count(), pool2.all.Count())
} }
if err := validateTxPoolInternals(pool1); err != nil { if err := validateTxPoolInternals(pool1); err != nil {
t.Errorf("pool 1 internal state corrupted: %v", err) t.Errorf("pool 1 internal state corrupted: %v", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment