transaction_pool.go 4.37 KB
Newer Older
obscuren's avatar
obscuren committed
1
package core
obscuren's avatar
obscuren committed
2 3

import (
4
	"errors"
obscuren's avatar
obscuren committed
5
	"fmt"
6
	"sync"
7

obscuren's avatar
obscuren committed
8
	"github.com/ethereum/go-ethereum/common"
9
	"github.com/ethereum/go-ethereum/core/types"
obscuren's avatar
obscuren committed
10
	"github.com/ethereum/go-ethereum/event"
obscuren's avatar
obscuren committed
11
	"github.com/ethereum/go-ethereum/logger"
obscuren's avatar
obscuren committed
12 13
)

14 15 16 17 18
var (
	txplogger = logger.NewLogger("TXP")

	ErrInvalidSender = errors.New("Invalid sender")
)
zelig's avatar
zelig committed
19

20
const txPoolQueueSize = 50
obscuren's avatar
obscuren committed
21

22
type TxPoolHook chan *types.Transaction
23
type TxMsg struct{ Tx *types.Transaction }
obscuren's avatar
obscuren committed
24 25

const (
26
	minGasPrice = 1000000
obscuren's avatar
obscuren committed
27 28
)

obscuren's avatar
obscuren committed
29
type TxProcessor interface {
30
	ProcessTransaction(tx *types.Transaction)
obscuren's avatar
obscuren committed
31 32
}

obscuren's avatar
obscuren committed
33 34
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
35
// independently read without needing access to the actual pool.
obscuren's avatar
obscuren committed
36
type TxPool struct {
37
	mu sync.RWMutex
obscuren's avatar
obscuren committed
38 39
	// Queueing channel for reading and writing incoming
	// transactions to
40
	queueChan chan *types.Transaction
obscuren's avatar
obscuren committed
41 42 43
	// Quiting channel
	quit chan bool
	// The actual pool
44
	//pool *list.List
45
	txs map[common.Hash]*types.Transaction
obscuren's avatar
obscuren committed
46

obscuren's avatar
obscuren committed
47
	SecondaryProcessor TxProcessor
obscuren's avatar
obscuren committed
48 49

	subscribers []chan TxMsg
obscuren's avatar
obscuren committed
50

51
	eventMux *event.TypeMux
obscuren's avatar
obscuren committed
52 53
}

54
func NewTxPool(eventMux *event.TypeMux) *TxPool {
obscuren's avatar
obscuren committed
55
	return &TxPool{
56
		txs:       make(map[common.Hash]*types.Transaction),
57 58 59
		queueChan: make(chan *types.Transaction, txPoolQueueSize),
		quit:      make(chan bool),
		eventMux:  eventMux,
obscuren's avatar
obscuren committed
60 61 62
	}
}

63
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
64 65
	// Validate sender
	if _, err := tx.From(); err != nil {
Felix Lange's avatar
Felix Lange committed
66
		return ErrInvalidSender
obscuren's avatar
obscuren committed
67
	}
68
	// Validate curve param
obscuren's avatar
obscuren committed
69 70
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
71
		return fmt.Errorf("tx.v != (28 || 27) => %v", v)
obscuren's avatar
obscuren committed
72
	}
73
	return nil
74

75 76 77 78
	/* XXX this kind of validation needs to happen elsewhere in the gui when sending txs.
	   Other clients should do their own validation. Value transfer could throw error
	   but doesn't necessarily invalidate the tx. Gas can still be payed for and miner
	   can still be rewarded for their inclusion and processing.
79
	sender := pool.stateQuery.GetAccount(senderAddr)
80
	totAmount := new(big.Int).Set(tx.Value())
obscuren's avatar
obscuren committed
81 82
	// Make sure there's enough in the sender's account. Having insufficient
	// funds won't invalidate this transaction but simple ignores it.
obscuren's avatar
obscuren committed
83
	if sender.Balance().Cmp(totAmount) < 0 {
84
		return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From())
85
	}
86
	*/
obscuren's avatar
obscuren committed
87 88
}

obscuren's avatar
obscuren committed
89
func (self *TxPool) addTx(tx *types.Transaction) {
90
	self.txs[tx.Hash()] = tx
obscuren's avatar
obscuren committed
91
}
92

93
func (self *TxPool) add(tx *types.Transaction) error {
94 95 96
	hash := tx.Hash()
	if self.txs[hash] != nil {
		return fmt.Errorf("Known transaction (%x)", hash[0:4])
97
	}
98 99 100 101 102
	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

obscuren's avatar
obscuren committed
103
	self.addTx(tx)
104

105
	var toname string
106
	if to := tx.To(); to != nil {
107
		toname = common.Bytes2Hex(to[:4])
108
	} else {
109
		toname = "[NEW_CONTRACT]"
110
	}
111 112 113 114
	// we can ignore the error here because From is
	// verified in ValidateTransaction.
	f, _ := tx.From()
	from := common.Bytes2Hex(f[:4])
115
	txplogger.Debugf("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
116 117

	// Notify the subscribers
obscuren's avatar
obscuren committed
118
	go self.eventMux.Post(TxPreEvent{tx})
119 120 121 122

	return nil
}

123
func (self *TxPool) Size() int {
obscuren's avatar
obscuren committed
124
	return len(self.txs)
125 126
}

127 128 129 130 131
func (self *TxPool) Add(tx *types.Transaction) error {
	self.mu.Lock()
	defer self.mu.Unlock()
	return self.add(tx)
}
132

zelig's avatar
zelig committed
133
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
134 135 136
	self.mu.Lock()
	defer self.mu.Unlock()

zelig's avatar
zelig committed
137
	for _, tx := range txs {
138 139
		if err := self.add(tx); err != nil {
			txplogger.Debugln(err)
zelig's avatar
zelig committed
140
		} else {
141 142
			h := tx.Hash()
			txplogger.Debugf("tx %x\n", h[:4])
zelig's avatar
zelig committed
143 144 145 146
		}
	}
}

obscuren's avatar
obscuren committed
147
func (self *TxPool) GetTransactions() (txs types.Transactions) {
148 149 150
	self.mu.RLock()
	defer self.mu.RUnlock()

obscuren's avatar
obscuren committed
151
	txs = make(types.Transactions, self.Size())
obscuren's avatar
obscuren committed
152
	i := 0
obscuren's avatar
obscuren committed
153 154
	for _, tx := range self.txs {
		txs[i] = tx
obscuren's avatar
obscuren committed
155
		i++
obscuren's avatar
obscuren committed
156
	}
157

obscuren's avatar
obscuren committed
158
	return
159 160
}

161
func (pool *TxPool) RemoveInvalid(query StateQuery) {
162 163
	pool.mu.Lock()

164
	var removedTxs types.Transactions
obscuren's avatar
obscuren committed
165
	for _, tx := range pool.txs {
166 167
		from, _ := tx.From()
		sender := query.GetAccount(from[:])
168
		err := pool.ValidateTransaction(tx)
169
		if err != nil || sender.Nonce() >= tx.Nonce() {
170
			removedTxs = append(removedTxs, tx)
171
		}
obscuren's avatar
obscuren committed
172
	}
173
	pool.mu.Unlock()
174 175

	pool.RemoveSet(removedTxs)
176 177
}

178
func (self *TxPool) RemoveSet(txs types.Transactions) {
179 180
	self.mu.Lock()
	defer self.mu.Unlock()
181
	for _, tx := range txs {
182
		delete(self.txs, tx.Hash())
183 184 185
	}
}

186
func (pool *TxPool) Flush() {
187
	pool.txs = make(map[common.Hash]*types.Transaction)
obscuren's avatar
obscuren committed
188 189 190 191 192 193 194
}

func (pool *TxPool) Start() {
}

func (pool *TxPool) Stop() {
	pool.Flush()
obscuren's avatar
obscuren committed
195

zelig's avatar
zelig committed
196
	txplogger.Infoln("Stopped")
obscuren's avatar
obscuren committed
197
}