Commit 4f7a3800 authored by Péter Szilágyi's avatar Péter Szilágyi Committed by GitHub

Merge pull request #14737 from holiman/txpool_localaccounts

Txpool localaccounts
parents 65f0e905 34ec9913
...@@ -66,6 +66,7 @@ var ( ...@@ -66,6 +66,7 @@ var (
utils.EthashDatasetDirFlag, utils.EthashDatasetDirFlag,
utils.EthashDatasetsInMemoryFlag, utils.EthashDatasetsInMemoryFlag,
utils.EthashDatasetsOnDiskFlag, utils.EthashDatasetsOnDiskFlag,
utils.TxPoolNoLocalsFlag,
utils.TxPoolPriceLimitFlag, utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag, utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag, utils.TxPoolAccountSlotsFlag,
......
...@@ -95,6 +95,7 @@ var AppHelpFlagGroups = []flagGroup{ ...@@ -95,6 +95,7 @@ var AppHelpFlagGroups = []flagGroup{
{ {
Name: "TRANSACTION POOL", Name: "TRANSACTION POOL",
Flags: []cli.Flag{ Flags: []cli.Flag{
utils.TxPoolNoLocalsFlag,
utils.TxPoolPriceLimitFlag, utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag, utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag, utils.TxPoolAccountSlotsFlag,
......
...@@ -209,6 +209,10 @@ var ( ...@@ -209,6 +209,10 @@ var (
Value: eth.DefaultConfig.EthashDatasetsOnDisk, Value: eth.DefaultConfig.EthashDatasetsOnDisk,
} }
// Transaction pool settings // Transaction pool settings
TxPoolNoLocalsFlag = cli.BoolFlag{
Name: "txpool.nolocals",
Usage: "Disables price exemptions for locally submitted transactions",
}
TxPoolPriceLimitFlag = cli.Uint64Flag{ TxPoolPriceLimitFlag = cli.Uint64Flag{
Name: "txpool.pricelimit", Name: "txpool.pricelimit",
Usage: "Minimum gas price limit to enforce for acceptance into the pool", Usage: "Minimum gas price limit to enforce for acceptance into the pool",
...@@ -831,6 +835,9 @@ func setGPO(ctx *cli.Context, cfg *gasprice.Config) { ...@@ -831,6 +835,9 @@ func setGPO(ctx *cli.Context, cfg *gasprice.Config) {
} }
func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolNoLocalsFlag.Name) {
cfg.NoLocals = ctx.GlobalBool(TxPoolNoLocalsFlag.Name)
}
if ctx.GlobalIsSet(TxPoolPriceLimitFlag.Name) { if ctx.GlobalIsSet(TxPoolPriceLimitFlag.Name) {
cfg.PriceLimit = ctx.GlobalUint64(TxPoolPriceLimitFlag.Name) cfg.PriceLimit = ctx.GlobalUint64(TxPoolPriceLimitFlag.Name)
} }
......
...@@ -420,18 +420,16 @@ func (l *txPricedList) Removed() { ...@@ -420,18 +420,16 @@ func (l *txPricedList) Removed() {
heap.Init(l.items) heap.Init(l.items)
} }
// Discard finds all the transactions below the given price threshold, drops them // Cap finds all the transactions below the given price threshold, drops them
// from the priced list and returs them for further removal from the entire pool. // from the priced list and returs them for further removal from the entire pool.
func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions { func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
for len(*l.items) > 0 { for len(*l.items) > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok {
hash := tx.Hash()
if _, ok := (*l.all)[hash]; !ok {
l.stales-- l.stales--
continue continue
} }
...@@ -440,7 +438,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions ...@@ -440,7 +438,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions
break break
} }
// Non stale transaction found, discard unless local // Non stale transaction found, discard unless local
if local.contains(hash) { if local.containsTx(tx) {
save = append(save, tx) save = append(save, tx)
} else { } else {
drop = append(drop, tx) drop = append(drop, tx)
...@@ -454,9 +452,9 @@ func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions ...@@ -454,9 +452,9 @@ func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions
// Underpriced checks whether a transaction is cheaper than (or as cheap as) the // Underpriced checks whether a transaction is cheaper than (or as cheap as) the
// lowest priced transaction currently being tracked. // lowest priced transaction currently being tracked.
func (l *txPricedList) Underpriced(tx *types.Transaction, local *txSet) bool { func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool {
// Local transactions cannot be underpriced // Local transactions cannot be underpriced
if local.contains(tx.Hash()) { if local.containsTx(tx) {
return false return false
} }
// Discard stale price points if found at the heap start // Discard stale price points if found at the heap start
...@@ -479,22 +477,20 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *txSet) bool { ...@@ -479,22 +477,20 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *txSet) bool {
} }
// Discard finds a number of most underpriced transactions, removes them from the // Discard finds a number of most underpriced transactions, removes them from the
// priced list and returs them for further removal from the entire pool. // priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Discard(count int, local *txSet) types.Transactions { func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
for len(*l.items) > 0 && count > 0 { for len(*l.items) > 0 && count > 0 {
// Discard stale transactions if found during cleanup // Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction) tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok {
hash := tx.Hash()
if _, ok := (*l.all)[hash]; !ok {
l.stales-- l.stales--
continue continue
} }
// Non stale transaction found, discard unless local // Non stale transaction found, discard unless local
if local.contains(hash) { if local.containsTx(tx) {
save = append(save, tx) save = append(save, tx)
} else { } else {
drop = append(drop, tx) drop = append(drop, tx)
......
This diff is collapsed.
This diff is collapsed.
...@@ -116,29 +116,18 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta ...@@ -116,29 +116,18 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
} }
func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
b.eth.txMu.Lock() return b.eth.txPool.AddLocal(signedTx)
defer b.eth.txMu.Unlock()
b.eth.txPool.SetLocal(signedTx)
return b.eth.txPool.Add(signedTx)
} }
func (b *EthApiBackend) RemoveTx(txHash common.Hash) { func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
b.eth.txPool.Remove(txHash) b.eth.txPool.Remove(txHash)
} }
func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
pending, err := b.eth.txPool.Pending() pending, err := b.eth.txPool.Pending()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var txs types.Transactions var txs types.Transactions
for _, batch := range pending { for _, batch := range pending {
txs = append(txs, batch...) txs = append(txs, batch...)
...@@ -147,30 +136,18 @@ func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) { ...@@ -147,30 +136,18 @@ func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
} }
func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
return b.eth.txPool.Get(hash) return b.eth.txPool.Get(hash)
} }
func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
return b.eth.txPool.State().GetNonce(addr), nil return b.eth.txPool.State().GetNonce(addr), nil
} }
func (b *EthApiBackend) Stats() (pending int, queued int) { func (b *EthApiBackend) Stats() (pending int, queued int) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
return b.eth.txPool.Stats() return b.eth.txPool.Stats()
} }
func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
b.eth.txMu.Lock()
defer b.eth.txMu.Unlock()
return b.eth.TxPool().Content() return b.eth.TxPool().Content()
} }
......
...@@ -63,7 +63,6 @@ type Ethereum struct { ...@@ -63,7 +63,6 @@ type Ethereum struct {
stopDbUpgrade func() // stop chain db sequential key upgrade stopDbUpgrade func() // stop chain db sequential key upgrade
// Handlers // Handlers
txPool *core.TxPool txPool *core.TxPool
txMu sync.Mutex
blockchain *core.BlockChain blockchain *core.BlockChain
protocolManager *ProtocolManager protocolManager *ProtocolManager
lesServer LesServer lesServer LesServer
......
...@@ -658,7 +658,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -658,7 +658,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
p.MarkTransaction(tx.Hash()) p.MarkTransaction(tx.Hash())
} }
pm.txpool.AddBatch(txs) pm.txpool.AddRemotes(txs)
default: default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code) return errResp(ErrInvalidMsgCode, "%v", msg.Code)
......
...@@ -94,9 +94,9 @@ type testTxPool struct { ...@@ -94,9 +94,9 @@ type testTxPool struct {
lock sync.RWMutex // Protects the transaction pool lock sync.RWMutex // Protects the transaction pool
} }
// AddBatch appends a batch of transactions to the pool, and notifies any // AddRemotes appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil // listeners if the addition channel is non nil
func (p *testTxPool) AddBatch(txs []*types.Transaction) error { func (p *testTxPool) AddRemotes(txs []*types.Transaction) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
......
...@@ -94,8 +94,8 @@ var errorToString = map[int]string{ ...@@ -94,8 +94,8 @@ var errorToString = map[int]string{
} }
type txPool interface { type txPool interface {
// AddBatch should add the given transactions to the pool. // AddRemotes should add the given transactions to the pool.
AddBatch([]*types.Transaction) error AddRemotes([]*types.Transaction) error
// Pending should return pending transactions. // Pending should return pending transactions.
// The slice should be modifiable by the caller. // The slice should be modifiable by the caller.
......
...@@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) { ...@@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) {
for nonce := range alltxs { for nonce := range alltxs {
alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize) alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize)
} }
pm.txpool.AddBatch(alltxs) pm.txpool.AddRemotes(alltxs)
// Connect several peers. They should all receive the pending transactions. // Connect several peers. They should all receive the pending transactions.
var wg sync.WaitGroup var wg sync.WaitGroup
......
...@@ -87,8 +87,8 @@ type BlockChain interface { ...@@ -87,8 +87,8 @@ type BlockChain interface {
} }
type txPool interface { type txPool interface {
// AddTransactions should add the given transactions to the pool. // AddRemotes should add the given transactions to the pool.
AddBatch([]*types.Transaction) error AddRemotes([]*types.Transaction) error
} }
type ProtocolManager struct { type ProtocolManager struct {
...@@ -801,7 +801,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { ...@@ -801,7 +801,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "") return errResp(ErrRequestRejected, "")
} }
if err := pm.txpool.AddBatch(txs); err != nil { if err := pm.txpool.AddRemotes(txs); err != nil {
return errResp(ErrUnexpectedResponse, "msg: %v", err) return errResp(ErrUnexpectedResponse, "msg: %v", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment