Commit 479aa61f authored by Péter Szilágyi's avatar Péter Szilágyi Committed by GitHub

Merge pull request #15343 from karalabe/txpool-replacement-propagation

core: fire tx event on replace, expand tests
parents eaa4f8a5 0af1ab0c
......@@ -85,7 +85,7 @@ func TestWildcardMatcher(t *testing.T) {
}
// makeRandomIndexes generates a random filter system, composed on multiple filter
// criteria, each having one bloom list component for the address and arbitrarilly
// criteria, each having one bloom list component for the address and arbitrarily
// many topic bloom list components.
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
res := make([][]bloomIndexes, len(lengths))
......
......@@ -31,6 +31,15 @@ import (
// into the journal, but no such file is currently open.
var errNoActiveJournal = errors.New("no active journal")
// devNull is a WriteCloser that just discards anything written into it. Its
// goal is to allow the transaction journal to write into a fake journal when
// loading transactions on startup without printing warnings due to no file
// being readt for write.
type devNull struct{}
func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
func (*devNull) Close() error { return nil }
// txJournal is a rotating log of transactions with the aim of storing locally
// created transactions to allow non-executed ones to survive node restarts.
type txJournal struct {
......@@ -59,6 +68,10 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
}
defer input.Close()
// Temporarilly discard any journal additions (don't double add on load)
journal.writer = new(devNull)
defer func() { journal.writer = nil }()
// Inject all transactions from the journal into the pool
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0
......
......@@ -640,6 +640,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// We've directly injected a replacement transaction, notify subsystems
go pool.txFeed.Send(TxPreEvent{tx})
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
......@@ -729,6 +733,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
go pool.txFeed.Send(TxPreEvent{tx})
}
......
This diff is collapsed.
......@@ -127,6 +127,8 @@ func (f *Feed) remove(sub *feedSub) {
// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
<-f.sendLock
......@@ -134,14 +136,14 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
f.mu.Unlock()
// Set the sent value on all channels.
rvalue := reflect.ValueOf(value)
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
// Set the sent value on all channels.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment