Commit 649787a9 authored by Péter Szilágyi's avatar Péter Szilágyi

core: fix transaction reorg issues within the tx pool

parent 36137623
...@@ -140,7 +140,6 @@ func (pool *TxPool) resetState() { ...@@ -140,7 +140,6 @@ func (pool *TxPool) resetState() {
} }
} }
} }
// Check the queue and move transactions over to the pending if possible // Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid // or remove those that have become invalid
pool.checkQueue() pool.checkQueue()
...@@ -301,17 +300,15 @@ func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Trans ...@@ -301,17 +300,15 @@ func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Trans
} }
// Add queues a single transaction in the pool if it is valid. // Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) (err error) { func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
err = self.add(tx) if err := self.add(tx); err != nil {
if err == nil { return err
// check and validate the queueue
self.checkQueue()
} }
self.checkQueue()
return return nil
} }
// AddTransactions attempts to queue all valid transactions in txs. // AddTransactions attempts to queue all valid transactions in txs.
...@@ -417,51 +414,55 @@ func (pool *TxPool) checkQueue() { ...@@ -417,51 +414,55 @@ func (pool *TxPool) checkQueue() {
pool.resetState() pool.resetState()
} }
var addq txQueue var promote txQueue
for address, txs := range pool.queue { for address, txs := range pool.queue {
// guessed nonce is the nonce currently kept by the tx pool (pending state)
guessedNonce := pool.pendingState.GetNonce(address)
// true nonce is the nonce known by the last state
currentState, err := pool.currentState() currentState, err := pool.currentState()
if err != nil { if err != nil {
glog.Errorf("could not get current state: %v", err) glog.Errorf("could not get current state: %v", err)
return return
} }
trueNonce := currentState.GetNonce(address) balance := currentState.GetBalance(address)
addq := addq[:0]
var (
guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
trueNonce = currentState.GetNonce(address) // nonce known by the last state
)
promote = promote[:0]
for hash, tx := range txs { for hash, tx := range txs {
if tx.Nonce() < trueNonce { // Drop processed or out of fund transactions
// Drop queued transactions whose nonce is lower than if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
// the account nonce because they have been processed. if glog.V(logger.Core) {
glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
}
delete(txs, hash) delete(txs, hash)
} else {
// Collect the remaining transactions for the next pass.
addq = append(addq, txQueueEntry{hash, address, tx})
}
}
// Find the next consecutive nonce range starting at the
// current account nonce.
sort.Sort(addq)
for i, e := range addq {
// start deleting the transactions from the queue if they exceed the limit
if i > maxQueued {
delete(pool.queue[address], e.hash)
continue continue
} }
// Collect the remaining transactions for the next pass.
if e.Nonce() > guessedNonce { promote = append(promote, txQueueEntry{hash, address, tx})
if len(addq)-i > maxQueued { }
// Find the next consecutive nonce range starting at the current account nonce,
// pushing the guessed nonce forward if we add consecutive transactions.
sort.Sort(promote)
for i, entry := range promote {
// If we reached a gap in the nonces, enforce transaction limit and stop
if entry.Nonce() > guessedNonce {
if len(promote)-i > maxQueued {
if glog.V(logger.Debug) { if glog.V(logger.Debug) {
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:])) glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
} }
for j := i + maxQueued; j < len(addq); j++ { for _, drop := range promote[i+maxQueued:] {
delete(txs, addq[j].hash) delete(txs, drop.hash)
} }
} }
break break
} }
delete(txs, e.hash) // Otherwise promote the transaction and move the guess nonce if needed
pool.addTx(e.hash, address, e.Transaction) pool.addTx(entry.hash, address, entry.Transaction)
delete(txs, entry.hash)
if entry.Nonce() == guessedNonce {
guessedNonce++
}
} }
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if len(txs) == 0 { if len(txs) == 0 {
...@@ -471,20 +472,56 @@ func (pool *TxPool) checkQueue() { ...@@ -471,20 +472,56 @@ func (pool *TxPool) checkQueue() {
} }
// validatePool removes invalid and processed transactions from the main pool. // validatePool removes invalid and processed transactions from the main pool.
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
// sequent (Still valid) transactions are moved back into the future queue. This
// is important to prevent a drained account from DOSing the network with non
// executable transactions.
func (pool *TxPool) validatePool() { func (pool *TxPool) validatePool() {
state, err := pool.currentState() state, err := pool.currentState()
if err != nil { if err != nil {
glog.V(logger.Info).Infoln("failed to get current state: %v", err) glog.V(logger.Info).Infoln("failed to get current state: %v", err)
return return
} }
balanceCache := make(map[common.Address]*big.Int)
// Clean up the pending pool, accumulating invalid nonces
gaps := make(map[common.Address]uint64)
for hash, tx := range pool.pending { for hash, tx := range pool.pending {
from, _ := tx.From() // err already checked sender, _ := tx.From() // err already checked
// perform light nonce validation
if state.GetNonce(from) > tx.Nonce() { // Perform light nonce and balance validation
balance := balanceCache[sender]
if balance == nil {
balance = state.GetBalance(sender)
balanceCache[sender] = balance
}
if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
// Remove an already past it invalidated transaction
if glog.V(logger.Core) { if glog.V(logger.Core) {
glog.Infof("removed tx (%x) from pool: low tx nonce\n", hash[:4]) glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
} }
delete(pool.pending, hash) delete(pool.pending, hash)
// Track the smallest invalid nonce to postpone subsequent transactions
if !past {
if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
gaps[sender] = tx.Nonce()
}
}
}
}
// Move all transactions after a gap back to the future queue
if len(gaps) > 0 {
for hash, tx := range pool.pending {
sender, _ := tx.From()
if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
if glog.V(logger.Core) {
glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
}
pool.queueTx(hash, tx)
delete(pool.pending, hash)
}
} }
} }
} }
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment