Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
49719e21
Unverified
Commit
49719e21
authored
May 18, 2018
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
core, eth: minor txpool event cleanups
parent
a2e43d28
Changes
19
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
89 additions
and
92 deletions
+89
-92
simulated.go
accounts/abi/bind/backends/simulated.go
+1
-1
events.go
core/events.go
+2
-5
tx_journal.go
core/tx_journal.go
+14
-15
tx_pool.go
core/tx_pool.go
+14
-14
tx_pool_test.go
core/tx_pool_test.go
+14
-13
api_backend.go
eth/api_backend.go
+2
-2
api.go
eth/filters/api.go
+2
-2
filter.go
eth/filters/filter.go
+1
-1
filter_system.go
eth/filters/filter_system.go
+8
-8
filter_system_test.go
eth/filters/filter_system_test.go
+2
-2
handler.go
eth/handler.go
+4
-4
helper_test.go
eth/helper_test.go
+1
-1
protocol.go
eth/protocol.go
+3
-3
protocol_test.go
eth/protocol_test.go
+1
-1
ethstats.go
ethstats/ethstats.go
+6
-6
backend.go
internal/ethapi/backend.go
+1
-1
api_backend.go
les/api_backend.go
+2
-2
txpool.go
light/txpool.go
+3
-3
worker.go
miner/worker.go
+8
-8
No files found.
accounts/abi/bind/backends/simulated.go
View file @
49719e21
...
...
@@ -454,7 +454,7 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
return
logs
,
nil
}
func
(
fb
*
filterBackend
)
Subscribe
TxPreEvent
(
ch
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
{
func
(
fb
*
filterBackend
)
Subscribe
NewTxsEvent
(
ch
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
{
return
event
.
NewSubscription
(
func
(
quit
<-
chan
struct
{})
error
{
<-
quit
return
nil
...
...
core/events.go
View file @
49719e21
...
...
@@ -21,8 +21,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
//
TxsPre
Event is posted when a batch of transactions enter the transaction pool.
type
TxsPreEvent
struct
{
Txs
types
.
Transactions
}
//
NewTxs
Event is posted when a batch of transactions enter the transaction pool.
type
NewTxsEvent
struct
{
Txs
[]
*
types
.
Transaction
}
// PendingLogsEvent is posted pre mining and notifies of pending logs.
type
PendingLogsEvent
struct
{
...
...
@@ -35,9 +35,6 @@ type PendingStateEvent struct{}
// NewMinedBlockEvent is posted when a block has been imported.
type
NewMinedBlockEvent
struct
{
Block
*
types
.
Block
}
// RemovedTransactionEvent is posted when a reorg happens
type
RemovedTransactionEvent
struct
{
Txs
types
.
Transactions
}
// RemovedLogsEvent is posted when a reorg happens
type
RemovedLogsEvent
struct
{
Logs
[]
*
types
.
Log
}
...
...
core/tx_journal.go
View file @
49719e21
...
...
@@ -76,22 +76,21 @@ func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
stream
:=
rlp
.
NewStream
(
input
,
0
)
total
,
dropped
:=
0
,
0
// flush imports a batch of transactions and bump the appropriate progress counters
flush
:=
func
(
txs
types
.
Transactions
)
{
errs
:=
add
(
txs
)
for
_
,
err
:=
range
errs
{
// Create a method to load a limited batch of transactions and bump the
// appropriate progress counters. Then use this method to load all the
// journalled transactions in small-ish batches.
loadBatch
:=
func
(
txs
types
.
Transactions
)
{
for
_
,
err
:=
range
add
(
txs
)
{
if
err
!=
nil
{
log
.
Debug
(
"Failed to add journaled transaction"
,
"err"
,
err
)
dropped
++
}
}
}
var
(
failure
error
txs
types
.
Transactions
batch
types
.
Transactions
)
for
{
// Parse the next transaction and terminate on error
tx
:=
new
(
types
.
Transaction
)
...
...
@@ -99,19 +98,19 @@ func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
if
err
!=
io
.
EOF
{
failure
=
err
}
if
batch
.
Len
()
>
0
{
loadBatch
(
batch
)
}
break
}
txs
=
append
(
txs
,
tx
)
// New transaction parsed, queue up for later, import if threnshold is reached
total
++
if
txs
.
Len
()
>
1024
{
flush
(
txs
)
txs
=
types
.
Transactions
{}
if
batch
=
append
(
batch
,
tx
);
batch
.
Len
()
>
1024
{
loadBatch
(
batch
)
batch
=
batch
[
:
0
]
}
}
if
txs
.
Len
()
>
0
{
flush
(
txs
)
txs
=
types
.
Transactions
{}
}
log
.
Info
(
"Loaded local transaction journal"
,
"transactions"
,
total
,
"dropped"
,
dropped
)
return
failure
...
...
core/tx_pool.go
View file @
49719e21
...
...
@@ -444,9 +444,9 @@ func (pool *TxPool) Stop() {
log
.
Info
(
"Transaction pool stopped"
)
}
// Subscribe
TxPreEvent registers a subscription of TxsPre
Event and
// Subscribe
NewTxsEvent registers a subscription of NewTxs
Event and
// starts sending event to the given channel.
func
(
pool
*
TxPool
)
Subscribe
TxPreEvent
(
ch
chan
<-
TxsPre
Event
)
event
.
Subscription
{
func
(
pool
*
TxPool
)
Subscribe
NewTxsEvent
(
ch
chan
<-
NewTxs
Event
)
event
.
Subscription
{
return
pool
.
scope
.
Track
(
pool
.
txFeed
.
Subscribe
(
ch
))
}
...
...
@@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
log
.
Trace
(
"Pooled new executable transaction"
,
"hash"
,
hash
,
"from"
,
from
,
"to"
,
tx
.
To
())
// We've directly injected a replacement transaction, notify subsystems
go
pool
.
txFeed
.
Send
(
TxsPre
Event
{
types
.
Transactions
{
tx
}})
go
pool
.
txFeed
.
Send
(
NewTxs
Event
{
types
.
Transactions
{
tx
}})
return
old
!=
nil
,
nil
}
...
...
@@ -712,7 +712,8 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
}
}
// promoteTx adds a transaction to the pending (processable) list of transactions.
// promoteTx adds a transaction to the pending (processable) list of transactions
// and returns whether it was inserted or an older was better.
//
// Note, this method assumes the pool lock is held!
func
(
pool
*
TxPool
)
promoteTx
(
addr
common
.
Address
,
hash
common
.
Hash
,
tx
*
types
.
Transaction
)
bool
{
...
...
@@ -746,6 +747,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool
.
beats
[
addr
]
=
time
.
Now
()
pool
.
pendingState
.
SetNonce
(
addr
,
tx
.
Nonce
()
+
1
)
return
true
}
...
...
@@ -906,7 +908,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func
(
pool
*
TxPool
)
promoteExecutables
(
accounts
[]
common
.
Address
)
{
var
promotedTxs
types
.
Transactions
// Track the promoted transactions to broadcast them at once
var
promoted
[]
*
types
.
Transaction
// Gather all the accounts potentially needing updates
if
accounts
==
nil
{
accounts
=
make
([]
common
.
Address
,
0
,
len
(
pool
.
queue
))
...
...
@@ -937,16 +941,13 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
queuedNofundsCounter
.
Inc
(
1
)
}
// Gather all executable transactions and promote them
txs
:=
list
.
Ready
(
pool
.
pendingState
.
GetNonce
(
addr
))
for
_
,
tx
:=
range
txs
{
for
_
,
tx
:=
range
list
.
Ready
(
pool
.
pendingState
.
GetNonce
(
addr
))
{
hash
:=
tx
.
Hash
()
inserted
:=
pool
.
promoteTx
(
addr
,
hash
,
tx
)
if
inserted
{
if
pool
.
promoteTx
(
addr
,
hash
,
tx
)
{
log
.
Trace
(
"Promoting queued transaction"
,
"hash"
,
hash
)
promoted
Txs
=
append
(
promotedTxs
,
tx
)
promoted
=
append
(
promoted
,
tx
)
}
}
// Drop all transactions over the allowed limit
if
!
pool
.
locals
.
contains
(
addr
)
{
for
_
,
tx
:=
range
list
.
Cap
(
int
(
pool
.
config
.
AccountQueue
))
{
...
...
@@ -963,10 +964,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
}
}
// Notify subsystem for new promoted transactions.
if
promotedTxs
.
Len
(
)
>
0
{
pool
.
txFeed
.
Send
(
TxsPreEvent
{
promotedTxs
})
if
len
(
promoted
)
>
0
{
pool
.
txFeed
.
Send
(
NewTxsEvent
{
promoted
})
}
// If the pending limit is overflown, start equalizing allowances
pending
:=
uint64
(
0
)
for
_
,
list
:=
range
pool
.
pending
{
...
...
core/tx_pool_test.go
View file @
49719e21
...
...
@@ -118,19 +118,20 @@ func validateTxPoolInternals(pool *TxPool) error {
// validateEvents checks that the correct number of transaction addition events
// were fired on the pool's event feed.
func
validateEvents
(
events
chan
TxsPreEvent
,
count
int
)
error
{
received
:=
0
for
{
if
received
==
count
{
break
}
func
validateEvents
(
events
chan
NewTxsEvent
,
count
int
)
error
{
var
received
[]
*
types
.
Transaction
for
len
(
received
)
<
count
{
select
{
case
ev
:=
<-
events
:
received
+=
ev
.
Txs
.
Len
(
)
received
=
append
(
received
,
ev
.
Txs
...
)
case
<-
time
.
After
(
time
.
Second
)
:
return
fmt
.
Errorf
(
"event #%d not fired"
,
received
)
}
}
if
len
(
received
)
>
count
{
return
fmt
.
Errorf
(
"more than %d events fired: %v"
,
count
,
received
[
count
:
])
}
select
{
case
ev
:=
<-
events
:
return
fmt
.
Errorf
(
"more than %d events fired: %v"
,
count
,
ev
.
Txs
)
...
...
@@ -674,7 +675,7 @@ func TestTransactionGapFilling(t *testing.T) {
pool
.
currentState
.
AddBalance
(
account
,
big
.
NewInt
(
1000000
))
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxsPre
Event
,
testTxPoolConfig
.
AccountQueue
+
5
)
events
:=
make
(
chan
NewTxs
Event
,
testTxPoolConfig
.
AccountQueue
+
5
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -925,7 +926,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
pool
.
currentState
.
AddBalance
(
account
,
big
.
NewInt
(
1000000
))
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxsPre
Event
,
testTxPoolConfig
.
AccountQueue
+
5
)
events
:=
make
(
chan
NewTxs
Event
,
testTxPoolConfig
.
AccountQueue
+
5
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1145,7 +1146,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxsPre
Event
,
32
)
events
:=
make
(
chan
NewTxs
Event
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1332,7 +1333,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxsPre
Event
,
32
)
events
:=
make
(
chan
NewTxs
Event
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1438,7 +1439,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxsPre
Event
,
32
)
events
:=
make
(
chan
NewTxs
Event
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1500,7 +1501,7 @@ func TestTransactionReplacement(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxsPre
Event
,
32
)
events
:=
make
(
chan
NewTxs
Event
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
eth/api_backend.go
View file @
49719e21
...
...
@@ -188,8 +188,8 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions,
return
b
.
eth
.
TxPool
()
.
Content
()
}
func
(
b
*
EthAPIBackend
)
Subscribe
TxPreEvent
(
ch
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
{
return
b
.
eth
.
TxPool
()
.
Subscribe
TxPre
Event
(
ch
)
func
(
b
*
EthAPIBackend
)
Subscribe
NewTxsEvent
(
ch
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
{
return
b
.
eth
.
TxPool
()
.
Subscribe
NewTxs
Event
(
ch
)
}
func
(
b
*
EthAPIBackend
)
Downloader
()
*
downloader
.
Downloader
{
...
...
eth/filters/api.go
View file @
49719e21
...
...
@@ -105,7 +105,7 @@ func (api *PublicFilterAPI) timeoutLoop() {
func
(
api
*
PublicFilterAPI
)
NewPendingTransactionFilter
()
rpc
.
ID
{
var
(
pendingTxs
=
make
(
chan
[]
common
.
Hash
)
pendingTxSub
=
api
.
events
.
SubscribePendingTx
Event
s
(
pendingTxs
)
pendingTxSub
=
api
.
events
.
SubscribePendingTxs
(
pendingTxs
)
)
api
.
filtersMu
.
Lock
()
...
...
@@ -145,7 +145,7 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
go
func
()
{
txHashes
:=
make
(
chan
[]
common
.
Hash
,
128
)
pendingTxSub
:=
api
.
events
.
SubscribePendingTx
Event
s
(
txHashes
)
pendingTxSub
:=
api
.
events
.
SubscribePendingTxs
(
txHashes
)
for
{
select
{
...
...
eth/filters/filter.go
View file @
49719e21
...
...
@@ -36,7 +36,7 @@ type Backend interface {
GetReceipts
(
ctx
context
.
Context
,
blockHash
common
.
Hash
)
(
types
.
Receipts
,
error
)
GetLogs
(
ctx
context
.
Context
,
blockHash
common
.
Hash
)
([][]
*
types
.
Log
,
error
)
Subscribe
TxPreEvent
(
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
Subscribe
NewTxsEvent
(
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
SubscribeChainEvent
(
ch
chan
<-
core
.
ChainEvent
)
event
.
Subscription
SubscribeRemovedLogsEvent
(
ch
chan
<-
core
.
RemovedLogsEvent
)
event
.
Subscription
SubscribeLogsEvent
(
ch
chan
<-
[]
*
types
.
Log
)
event
.
Subscription
...
...
eth/filters/filter_system.go
View file @
49719e21
...
...
@@ -59,7 +59,7 @@ const (
const
(
// txChanSize is the size of channel listening to
TxsPre
Event.
// txChanSize is the size of channel listening to
NewTxs
Event.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
...
...
@@ -104,7 +104,7 @@ type EventSystem struct {
// Channels
install
chan
*
subscription
// install filter for event notification
uninstall
chan
*
subscription
// remove filter for event notification
txsCh
chan
core
.
TxsPre
Event
// Channel to receive new transactions event
txsCh
chan
core
.
NewTxs
Event
// Channel to receive new transactions event
logsCh
chan
[]
*
types
.
Log
// Channel to receive new log event
rmLogsCh
chan
core
.
RemovedLogsEvent
// Channel to receive removed log event
chainCh
chan
core
.
ChainEvent
// Channel to receive new chain event
...
...
@@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
lightMode
:
lightMode
,
install
:
make
(
chan
*
subscription
),
uninstall
:
make
(
chan
*
subscription
),
txsCh
:
make
(
chan
core
.
TxsPre
Event
,
txChanSize
),
txsCh
:
make
(
chan
core
.
NewTxs
Event
,
txChanSize
),
logsCh
:
make
(
chan
[]
*
types
.
Log
,
logsChanSize
),
rmLogsCh
:
make
(
chan
core
.
RemovedLogsEvent
,
rmLogsChanSize
),
chainCh
:
make
(
chan
core
.
ChainEvent
,
chainEvChanSize
),
}
// Subscribe events
m
.
txsSub
=
m
.
backend
.
Subscribe
TxPre
Event
(
m
.
txsCh
)
m
.
txsSub
=
m
.
backend
.
Subscribe
NewTxs
Event
(
m
.
txsCh
)
m
.
logsSub
=
m
.
backend
.
SubscribeLogsEvent
(
m
.
logsCh
)
m
.
rmLogsSub
=
m
.
backend
.
SubscribeRemovedLogsEvent
(
m
.
rmLogsCh
)
m
.
chainSub
=
m
.
backend
.
SubscribeChainEvent
(
m
.
chainCh
)
...
...
@@ -298,9 +298,9 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return
es
.
subscribe
(
sub
)
}
// SubscribePendingTx
Event
s creates a subscription that writes transaction hashes for
// SubscribePendingTxs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func
(
es
*
EventSystem
)
SubscribePendingTx
Event
s
(
hashes
chan
[]
common
.
Hash
)
*
Subscription
{
func
(
es
*
EventSystem
)
SubscribePendingTxs
(
hashes
chan
[]
common
.
Hash
)
*
Subscription
{
sub
:=
&
subscription
{
id
:
rpc
.
NewID
(),
typ
:
PendingTransactionsSubscription
,
...
...
@@ -348,8 +348,8 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
}
}
}
case
core
.
TxsPre
Event
:
hashes
:=
make
([]
common
.
Hash
,
0
,
e
.
Txs
.
Len
(
))
case
core
.
NewTxs
Event
:
hashes
:=
make
([]
common
.
Hash
,
0
,
len
(
e
.
Txs
))
for
_
,
tx
:=
range
e
.
Txs
{
hashes
=
append
(
hashes
,
tx
.
Hash
())
}
...
...
eth/filters/filter_system_test.go
View file @
49719e21
...
...
@@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types
return
logs
,
nil
}
func
(
b
*
testBackend
)
Subscribe
TxPreEvent
(
ch
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
{
func
(
b
*
testBackend
)
Subscribe
NewTxsEvent
(
ch
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
{
return
b
.
txFeed
.
Subscribe
(
ch
)
}
...
...
@@ -232,7 +232,7 @@ func TestPendingTxFilter(t *testing.T) {
fid0
:=
api
.
NewPendingTransactionFilter
()
time
.
Sleep
(
1
*
time
.
Second
)
txFeed
.
Send
(
core
.
TxsPreEvent
{
transactions
})
txFeed
.
Send
(
core
.
NewTxsEvent
{
Txs
:
transactions
})
timeout
:=
time
.
Now
()
.
Add
(
1
*
time
.
Second
)
for
{
...
...
eth/handler.go
View file @
49719e21
...
...
@@ -46,7 +46,7 @@ const (
softResponseLimit
=
2
*
1024
*
1024
// Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize
=
500
// Approximate size of an RLP encoded block header
// txChanSize is the size of channel listening to
TxsPre
Event.
// txChanSize is the size of channel listening to
NewTxs
Event.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
)
...
...
@@ -81,7 +81,7 @@ type ProtocolManager struct {
SubProtocols
[]
p2p
.
Protocol
eventMux
*
event
.
TypeMux
txsCh
chan
core
.
TxsPre
Event
txsCh
chan
core
.
NewTxs
Event
txsSub
event
.
Subscription
minedBlockSub
*
event
.
TypeMuxSubscription
...
...
@@ -204,8 +204,8 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm
.
maxPeers
=
maxPeers
// broadcast transactions
pm
.
txsCh
=
make
(
chan
core
.
TxsPre
Event
,
txChanSize
)
pm
.
txsSub
=
pm
.
txpool
.
Subscribe
TxPre
Event
(
pm
.
txsCh
)
pm
.
txsCh
=
make
(
chan
core
.
NewTxs
Event
,
txChanSize
)
pm
.
txsSub
=
pm
.
txpool
.
Subscribe
NewTxs
Event
(
pm
.
txsCh
)
go
pm
.
txBroadcastLoop
()
// broadcast mined blocks
...
...
eth/helper_test.go
View file @
49719e21
...
...
@@ -124,7 +124,7 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
return
batches
,
nil
}
func
(
p
*
testTxPool
)
Subscribe
TxPreEvent
(
ch
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
{
func
(
p
*
testTxPool
)
Subscribe
NewTxsEvent
(
ch
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
{
return
p
.
txFeed
.
Subscribe
(
ch
)
}
...
...
eth/protocol.go
View file @
49719e21
...
...
@@ -103,9 +103,9 @@ type txPool interface {
// The slice should be modifiable by the caller.
Pending
()
(
map
[
common
.
Address
]
types
.
Transactions
,
error
)
// Subscribe
TxPre
Event should return an event subscription of
//
TxsPre
Event and send events to the given channel.
Subscribe
TxPreEvent
(
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
// Subscribe
NewTxs
Event should return an event subscription of
//
NewTxs
Event and send events to the given channel.
Subscribe
NewTxsEvent
(
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
}
// statusData is the network packet for the status message.
...
...
eth/protocol_test.go
View file @
49719e21
...
...
@@ -116,7 +116,7 @@ func testRecvTransactions(t *testing.T, protocol int) {
t
.
Errorf
(
"added wrong tx hash: got %v, want %v"
,
added
[
0
]
.
Hash
(),
tx
.
Hash
())
}
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Errorf
(
"no
TxsPre
Event received within 2 seconds"
)
t
.
Errorf
(
"no
NewTxs
Event received within 2 seconds"
)
}
}
...
...
ethstats/ethstats.go
View file @
49719e21
...
...
@@ -49,7 +49,7 @@ const (
// history request.
historyUpdateRange
=
50
// txChanSize is the size of channel listening to
TxsPre
Event.
// txChanSize is the size of channel listening to
NewTxs
Event.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
...
...
@@ -57,9 +57,9 @@ const (
)
type
txPool
interface
{
// Subscribe
TxPre
Event should return an event subscription of
//
TxsPre
Event and send events to the given channel.
Subscribe
TxPreEvent
(
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
// Subscribe
NewTxs
Event should return an event subscription of
//
NewTxs
Event and send events to the given channel.
Subscribe
NewTxsEvent
(
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
}
type
blockChain
interface
{
...
...
@@ -150,8 +150,8 @@ func (s *Service) loop() {
headSub
:=
blockchain
.
SubscribeChainHeadEvent
(
chainHeadCh
)
defer
headSub
.
Unsubscribe
()
txEventCh
:=
make
(
chan
core
.
TxsPre
Event
,
txChanSize
)
txSub
:=
txpool
.
Subscribe
TxPre
Event
(
txEventCh
)
txEventCh
:=
make
(
chan
core
.
NewTxs
Event
,
txChanSize
)
txSub
:=
txpool
.
Subscribe
NewTxs
Event
(
txEventCh
)
defer
txSub
.
Unsubscribe
()
// Start a goroutine that exhausts the subsciptions to avoid events piling up
...
...
internal/ethapi/backend.go
View file @
49719e21
...
...
@@ -65,7 +65,7 @@ type Backend interface {
GetPoolNonce
(
ctx
context
.
Context
,
addr
common
.
Address
)
(
uint64
,
error
)
Stats
()
(
pending
int
,
queued
int
)
TxPoolContent
()
(
map
[
common
.
Address
]
types
.
Transactions
,
map
[
common
.
Address
]
types
.
Transactions
)
Subscribe
TxPreEvent
(
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
Subscribe
NewTxsEvent
(
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
ChainConfig
()
*
params
.
ChainConfig
CurrentBlock
()
*
types
.
Block
...
...
les/api_backend.go
View file @
49719e21
...
...
@@ -136,8 +136,8 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
return
b
.
eth
.
txPool
.
Content
()
}
func
(
b
*
LesApiBackend
)
Subscribe
TxPreEvent
(
ch
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
{
return
b
.
eth
.
txPool
.
Subscribe
TxPre
Event
(
ch
)
func
(
b
*
LesApiBackend
)
Subscribe
NewTxsEvent
(
ch
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
{
return
b
.
eth
.
txPool
.
Subscribe
NewTxs
Event
(
ch
)
}
func
(
b
*
LesApiBackend
)
SubscribeChainEvent
(
ch
chan
<-
core
.
ChainEvent
)
event
.
Subscription
{
...
...
light/txpool.go
View file @
49719e21
...
...
@@ -321,9 +321,9 @@ func (pool *TxPool) Stop() {
log
.
Info
(
"Transaction pool stopped"
)
}
// Subscribe
TxPreEvent registers a subscription of core.TxsPre
Event and
// Subscribe
NewTxsEvent registers a subscription of core.NewTxs
Event and
// starts sending event to the given channel.
func
(
pool
*
TxPool
)
Subscribe
TxPreEvent
(
ch
chan
<-
core
.
TxsPre
Event
)
event
.
Subscription
{
func
(
pool
*
TxPool
)
Subscribe
NewTxsEvent
(
ch
chan
<-
core
.
NewTxs
Event
)
event
.
Subscription
{
return
pool
.
scope
.
Track
(
pool
.
txFeed
.
Subscribe
(
ch
))
}
...
...
@@ -412,7 +412,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
go
self
.
txFeed
.
Send
(
core
.
TxsPreEvent
{
types
.
Transactions
{
tx
}})
go
self
.
txFeed
.
Send
(
core
.
NewTxsEvent
{
Txs
:
types
.
Transactions
{
tx
}})
}
// Print a log message if low enough level is set
...
...
miner/worker.go
View file @
49719e21
...
...
@@ -42,7 +42,7 @@ const (
resultQueueSize
=
10
miningLogAtDepth
=
5
// txChanSize is the size of channel listening to
TxsPre
Event.
// txChanSize is the size of channel listening to
NewTxs
Event.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
...
...
@@ -71,7 +71,7 @@ type Work struct {
family
*
set
.
Set
// family set (used for checking uncle invalidity)
uncles
*
set
.
Set
// uncle set
tcount
int
// tx count in cycle
gasPool
*
core
.
GasPool
// available gas used to pack transaction
.
gasPool
*
core
.
GasPool
// available gas used to pack transaction
s
Block
*
types
.
Block
// the new block
...
...
@@ -96,7 +96,7 @@ type worker struct {
// update loop
mux
*
event
.
TypeMux
txsCh
chan
core
.
TxsPre
Event
txsCh
chan
core
.
NewTxs
Event
txsSub
event
.
Subscription
chainHeadCh
chan
core
.
ChainHeadEvent
chainHeadSub
event
.
Subscription
...
...
@@ -138,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
engine
:
engine
,
eth
:
eth
,
mux
:
mux
,
txsCh
:
make
(
chan
core
.
TxsPre
Event
,
txChanSize
),
txsCh
:
make
(
chan
core
.
NewTxs
Event
,
txChanSize
),
chainHeadCh
:
make
(
chan
core
.
ChainHeadEvent
,
chainHeadChanSize
),
chainSideCh
:
make
(
chan
core
.
ChainSideEvent
,
chainSideChanSize
),
chainDb
:
eth
.
ChainDb
(),
...
...
@@ -150,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
agents
:
make
(
map
[
Agent
]
struct
{}),
unconfirmed
:
newUnconfirmedBlocks
(
eth
.
BlockChain
(),
miningLogAtDepth
),
}
// Subscribe
TxsPre
Event for tx pool
worker
.
txsSub
=
eth
.
TxPool
()
.
Subscribe
TxPre
Event
(
worker
.
txsCh
)
// Subscribe
NewTxs
Event for tx pool
worker
.
txsSub
=
eth
.
TxPool
()
.
Subscribe
NewTxs
Event
(
worker
.
txsCh
)
// Subscribe events for blockchain
worker
.
chainHeadSub
=
eth
.
BlockChain
()
.
SubscribeChainHeadEvent
(
worker
.
chainHeadCh
)
worker
.
chainSideSub
=
eth
.
BlockChain
()
.
SubscribeChainSideEvent
(
worker
.
chainSideCh
)
...
...
@@ -259,7 +259,7 @@ func (self *worker) update() {
self
.
possibleUncles
[
ev
.
Block
.
Hash
()]
=
ev
.
Block
self
.
uncleMu
.
Unlock
()
// Handle
TxsPre
Event
// Handle
NewTxs
Event
case
ev
:=
<-
self
.
txsCh
:
// Apply transactions to the pending state if we're not mining.
//
...
...
@@ -538,7 +538,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
for
{
// If we don't have enough gas for any further transactions then we're done
if
env
.
gasPool
.
Gas
()
<
params
.
TxGas
{
log
.
Trace
(
"Not enough gas for further transactions"
,
"
gp"
,
env
.
gasPool
)
log
.
Trace
(
"Not enough gas for further transactions"
,
"
have"
,
env
.
gasPool
,
"want"
,
params
.
TxGas
)
break
}
// Retrieve the next transaction and abort if all done
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment