Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
a2e43d28
Unverified
Commit
a2e43d28
authored
7 years ago
by
rjl493456442
Committed by
Péter Szilágyi
6 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
all: collate new transaction events together
parent
6286c255
Changes
19
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
165 additions
and
111 deletions
+165
-111
simulated.go
accounts/abi/bind/backends/simulated.go
+1
-1
events.go
core/events.go
+2
-2
tx_journal.go
core/tx_journal.go
+25
-7
tx_pool.go
core/tx_pool.go
+21
-11
tx_pool_test.go
core/tx_pool_test.go
+18
-13
api_backend.go
eth/api_backend.go
+1
-1
api.go
eth/filters/api.go
+9
-5
filter.go
eth/filters/filter.go
+1
-1
filter_system.go
eth/filters/filter_system.go
+21
-17
filter_system_test.go
eth/filters/filter_system_test.go
+2
-5
handler.go
eth/handler.go
+24
-17
helper_test.go
eth/helper_test.go
+1
-1
protocol.go
eth/protocol.go
+2
-2
protocol_test.go
eth/protocol_test.go
+1
-1
ethstats.go
ethstats/ethstats.go
+4
-4
backend.go
internal/ethapi/backend.go
+1
-1
api_backend.go
les/api_backend.go
+1
-1
txpool.go
light/txpool.go
+3
-3
worker.go
miner/worker.go
+27
-18
No files found.
accounts/abi/bind/backends/simulated.go
View file @
a2e43d28
...
...
@@ -454,7 +454,7 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
return
logs
,
nil
}
func
(
fb
*
filterBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
{
func
(
fb
*
filterBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
{
return
event
.
NewSubscription
(
func
(
quit
<-
chan
struct
{})
error
{
<-
quit
return
nil
...
...
This diff is collapsed.
Click to expand it.
core/events.go
View file @
a2e43d28
...
...
@@ -21,8 +21,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
// Tx
PreEvent is posted when a transaction enters
the transaction pool.
type
Tx
PreEvent
struct
{
Tx
*
types
.
Transaction
}
// Tx
sPreEvent is posted when a batch of transactions enter
the transaction pool.
type
Tx
sPreEvent
struct
{
Txs
types
.
Transactions
}
// PendingLogsEvent is posted pre mining and notifies of pending logs.
type
PendingLogsEvent
struct
{
...
...
This diff is collapsed.
Click to expand it.
core/tx_journal.go
View file @
a2e43d28
...
...
@@ -56,7 +56,7 @@ func newTxJournal(path string) *txJournal {
// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
func
(
journal
*
txJournal
)
load
(
add
func
(
*
types
.
Transaction
)
error
)
error
{
func
(
journal
*
txJournal
)
load
(
add
func
(
[]
*
types
.
Transaction
)
[]
error
)
error
{
// Skip the parsing if the journal file doens't exist at all
if
_
,
err
:=
os
.
Stat
(
journal
.
path
);
os
.
IsNotExist
(
err
)
{
return
nil
...
...
@@ -76,7 +76,22 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
stream
:=
rlp
.
NewStream
(
input
,
0
)
total
,
dropped
:=
0
,
0
var
failure
error
// flush imports a batch of transactions and bump the appropriate progress counters
flush
:=
func
(
txs
types
.
Transactions
)
{
errs
:=
add
(
txs
)
for
_
,
err
:=
range
errs
{
if
err
!=
nil
{
log
.
Debug
(
"Failed to add journaled transaction"
,
"err"
,
err
)
dropped
++
}
}
}
var
(
failure
error
txs
types
.
Transactions
)
for
{
// Parse the next transaction and terminate on error
tx
:=
new
(
types
.
Transaction
)
...
...
@@ -86,14 +101,17 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
}
break
}
// Import the transaction and bump the appropriate progress counters
txs
=
append
(
txs
,
tx
)
total
++
if
err
=
add
(
tx
);
err
!=
nil
{
log
.
Debug
(
"Failed to add journaled transaction"
,
"err"
,
err
)
dropped
++
continue
if
txs
.
Len
()
>
1024
{
flush
(
txs
)
txs
=
types
.
Transactions
{}
}
}
if
txs
.
Len
()
>
0
{
flush
(
txs
)
txs
=
types
.
Transactions
{}
}
log
.
Info
(
"Loaded local transaction journal"
,
"transactions"
,
total
,
"dropped"
,
dropped
)
return
failure
...
...
This diff is collapsed.
Click to expand it.
core/tx_pool.go
View file @
a2e43d28
...
...
@@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
if
!
config
.
NoLocals
&&
config
.
Journal
!=
""
{
pool
.
journal
=
newTxJournal
(
config
.
Journal
)
if
err
:=
pool
.
journal
.
load
(
pool
.
AddLocal
);
err
!=
nil
{
if
err
:=
pool
.
journal
.
load
(
pool
.
AddLocal
s
);
err
!=
nil
{
log
.
Warn
(
"Failed to load transaction journal"
,
"err"
,
err
)
}
if
err
:=
pool
.
journal
.
rotate
(
pool
.
local
());
err
!=
nil
{
...
...
@@ -444,9 +444,9 @@ func (pool *TxPool) Stop() {
log
.
Info
(
"Transaction pool stopped"
)
}
// SubscribeTxPreEvent registers a subscription of TxPreEvent and
// SubscribeTxPreEvent registers a subscription of Tx
s
PreEvent and
// starts sending event to the given channel.
func
(
pool
*
TxPool
)
SubscribeTxPreEvent
(
ch
chan
<-
TxPreEvent
)
event
.
Subscription
{
func
(
pool
*
TxPool
)
SubscribeTxPreEvent
(
ch
chan
<-
Tx
s
PreEvent
)
event
.
Subscription
{
return
pool
.
scope
.
Track
(
pool
.
txFeed
.
Subscribe
(
ch
))
}
...
...
@@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
log
.
Trace
(
"Pooled new executable transaction"
,
"hash"
,
hash
,
"from"
,
from
,
"to"
,
tx
.
To
())
// We've directly injected a replacement transaction, notify subsystems
go
pool
.
txFeed
.
Send
(
Tx
PreEvent
{
tx
})
go
pool
.
txFeed
.
Send
(
Tx
sPreEvent
{
types
.
Transactions
{
tx
}
})
return
old
!=
nil
,
nil
}
...
...
@@ -715,7 +715,7 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// promoteTx adds a transaction to the pending (processable) list of transactions.
//
// Note, this method assumes the pool lock is held!
func
(
pool
*
TxPool
)
promoteTx
(
addr
common
.
Address
,
hash
common
.
Hash
,
tx
*
types
.
Transaction
)
{
func
(
pool
*
TxPool
)
promoteTx
(
addr
common
.
Address
,
hash
common
.
Hash
,
tx
*
types
.
Transaction
)
bool
{
// Try to insert the transaction into the pending queue
if
pool
.
pending
[
addr
]
==
nil
{
pool
.
pending
[
addr
]
=
newTxList
(
true
)
...
...
@@ -729,7 +729,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool
.
priced
.
Removed
()
pendingDiscardCounter
.
Inc
(
1
)
return
return
false
}
// Otherwise discard any previous transaction and mark this
if
old
!=
nil
{
...
...
@@ -746,8 +746,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool
.
beats
[
addr
]
=
time
.
Now
()
pool
.
pendingState
.
SetNonce
(
addr
,
tx
.
Nonce
()
+
1
)
go
pool
.
txFeed
.
Send
(
TxPreEvent
{
tx
})
return
true
}
// AddLocal enqueues a single transaction into the pool if it is valid, marking
...
...
@@ -907,6 +906,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func
(
pool
*
TxPool
)
promoteExecutables
(
accounts
[]
common
.
Address
)
{
var
promotedTxs
types
.
Transactions
// Gather all the accounts potentially needing updates
if
accounts
==
nil
{
accounts
=
make
([]
common
.
Address
,
0
,
len
(
pool
.
queue
))
...
...
@@ -937,11 +937,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
queuedNofundsCounter
.
Inc
(
1
)
}
// Gather all executable transactions and promote them
for
_
,
tx
:=
range
list
.
Ready
(
pool
.
pendingState
.
GetNonce
(
addr
))
{
txs
:=
list
.
Ready
(
pool
.
pendingState
.
GetNonce
(
addr
))
for
_
,
tx
:=
range
txs
{
hash
:=
tx
.
Hash
()
log
.
Trace
(
"Promoting queued transaction"
,
"hash"
,
hash
)
pool
.
promoteTx
(
addr
,
hash
,
tx
)
inserted
:=
pool
.
promoteTx
(
addr
,
hash
,
tx
)
if
inserted
{
log
.
Trace
(
"Promoting queued transaction"
,
"hash"
,
hash
)
promotedTxs
=
append
(
promotedTxs
,
tx
)
}
}
// Drop all transactions over the allowed limit
if
!
pool
.
locals
.
contains
(
addr
)
{
for
_
,
tx
:=
range
list
.
Cap
(
int
(
pool
.
config
.
AccountQueue
))
{
...
...
@@ -957,6 +962,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
delete
(
pool
.
queue
,
addr
)
}
}
// Notify subsystem for new promoted transactions.
if
promotedTxs
.
Len
()
>
0
{
pool
.
txFeed
.
Send
(
TxsPreEvent
{
promotedTxs
})
}
// If the pending limit is overflown, start equalizing allowances
pending
:=
uint64
(
0
)
for
_
,
list
:=
range
pool
.
pending
{
...
...
This diff is collapsed.
Click to expand it.
core/tx_pool_test.go
View file @
a2e43d28
...
...
@@ -118,21 +118,26 @@ func validateTxPoolInternals(pool *TxPool) error {
// validateEvents checks that the correct number of transaction addition events
// were fired on the pool's event feed.
func
validateEvents
(
events
chan
TxPreEvent
,
count
int
)
error
{
for
i
:=
0
;
i
<
count
;
i
++
{
func
validateEvents
(
events
chan
TxsPreEvent
,
count
int
)
error
{
received
:=
0
for
{
if
received
==
count
{
break
}
select
{
case
<-
events
:
case
ev
:=
<-
events
:
received
+=
ev
.
Txs
.
Len
()
case
<-
time
.
After
(
time
.
Second
)
:
return
fmt
.
Errorf
(
"event #%d not fired"
,
i
)
return
fmt
.
Errorf
(
"event #%d not fired"
,
received
)
}
}
select
{
case
tx
:=
<-
events
:
return
fmt
.
Errorf
(
"more than %d events fired: %v"
,
count
,
tx
.
Tx
)
case
ev
:=
<-
events
:
return
fmt
.
Errorf
(
"more than %d events fired: %v"
,
count
,
ev
.
Txs
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
// This branch should be "default", but it's a data race between goroutines,
// reading the event channel and pushng into it, so better wait a bit ensuring
// reading the event channel and push
i
ng into it, so better wait a bit ensuring
// really nothing gets injected.
}
return
nil
...
...
@@ -669,7 +674,7 @@ func TestTransactionGapFilling(t *testing.T) {
pool
.
currentState
.
AddBalance
(
account
,
big
.
NewInt
(
1000000
))
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxPreEvent
,
testTxPoolConfig
.
AccountQueue
+
5
)
events
:=
make
(
chan
Tx
s
PreEvent
,
testTxPoolConfig
.
AccountQueue
+
5
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -920,7 +925,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
pool
.
currentState
.
AddBalance
(
account
,
big
.
NewInt
(
1000000
))
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxPreEvent
,
testTxPoolConfig
.
AccountQueue
+
5
)
events
:=
make
(
chan
Tx
s
PreEvent
,
testTxPoolConfig
.
AccountQueue
+
5
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1140,7 +1145,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxPreEvent
,
32
)
events
:=
make
(
chan
Tx
s
PreEvent
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1327,7 +1332,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxPreEvent
,
32
)
events
:=
make
(
chan
Tx
s
PreEvent
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1433,7 +1438,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxPreEvent
,
32
)
events
:=
make
(
chan
Tx
s
PreEvent
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
@@ -1495,7 +1500,7 @@ func TestTransactionReplacement(t *testing.T) {
defer
pool
.
Stop
()
// Keep track of transaction events to ensure all executables get announced
events
:=
make
(
chan
TxPreEvent
,
32
)
events
:=
make
(
chan
Tx
s
PreEvent
,
32
)
sub
:=
pool
.
txFeed
.
Subscribe
(
events
)
defer
sub
.
Unsubscribe
()
...
...
This diff is collapsed.
Click to expand it.
eth/api_backend.go
View file @
a2e43d28
...
...
@@ -188,7 +188,7 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions,
return
b
.
eth
.
TxPool
()
.
Content
()
}
func
(
b
*
EthAPIBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
{
func
(
b
*
EthAPIBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
{
return
b
.
eth
.
TxPool
()
.
SubscribeTxPreEvent
(
ch
)
}
...
...
This diff is collapsed.
Click to expand it.
eth/filters/api.go
View file @
a2e43d28
...
...
@@ -104,7 +104,7 @@ func (api *PublicFilterAPI) timeoutLoop() {
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func
(
api
*
PublicFilterAPI
)
NewPendingTransactionFilter
()
rpc
.
ID
{
var
(
pendingTxs
=
make
(
chan
common
.
Hash
)
pendingTxs
=
make
(
chan
[]
common
.
Hash
)
pendingTxSub
=
api
.
events
.
SubscribePendingTxEvents
(
pendingTxs
)
)
...
...
@@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
case
ph
:=
<-
pendingTxs
:
api
.
filtersMu
.
Lock
()
if
f
,
found
:=
api
.
filters
[
pendingTxSub
.
ID
];
found
{
f
.
hashes
=
append
(
f
.
hashes
,
ph
)
f
.
hashes
=
append
(
f
.
hashes
,
ph
...
)
}
api
.
filtersMu
.
Unlock
()
case
<-
pendingTxSub
.
Err
()
:
...
...
@@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
rpcSub
:=
notifier
.
CreateSubscription
()
go
func
()
{
txHashes
:=
make
(
chan
common
.
Hash
)
txHashes
:=
make
(
chan
[]
common
.
Hash
,
128
)
pendingTxSub
:=
api
.
events
.
SubscribePendingTxEvents
(
txHashes
)
for
{
select
{
case
h
:=
<-
txHashes
:
notifier
.
Notify
(
rpcSub
.
ID
,
h
)
case
hashes
:=
<-
txHashes
:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
for
_
,
h
:=
range
hashes
{
notifier
.
Notify
(
rpcSub
.
ID
,
h
)
}
case
<-
rpcSub
.
Err
()
:
pendingTxSub
.
Unsubscribe
()
return
...
...
This diff is collapsed.
Click to expand it.
eth/filters/filter.go
View file @
a2e43d28
...
...
@@ -36,7 +36,7 @@ type Backend interface {
GetReceipts
(
ctx
context
.
Context
,
blockHash
common
.
Hash
)
(
types
.
Receipts
,
error
)
GetLogs
(
ctx
context
.
Context
,
blockHash
common
.
Hash
)
([][]
*
types
.
Log
,
error
)
SubscribeTxPreEvent
(
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
SubscribeTxPreEvent
(
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
SubscribeChainEvent
(
ch
chan
<-
core
.
ChainEvent
)
event
.
Subscription
SubscribeRemovedLogsEvent
(
ch
chan
<-
core
.
RemovedLogsEvent
)
event
.
Subscription
SubscribeLogsEvent
(
ch
chan
<-
[]
*
types
.
Log
)
event
.
Subscription
...
...
This diff is collapsed.
Click to expand it.
eth/filters/filter_system.go
View file @
a2e43d28
...
...
@@ -59,7 +59,7 @@ const (
const
(
// txChanSize is the size of channel listening to TxPreEvent.
// txChanSize is the size of channel listening to Tx
s
PreEvent.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
...
...
@@ -80,7 +80,7 @@ type subscription struct {
created
time
.
Time
logsCrit
ethereum
.
FilterQuery
logs
chan
[]
*
types
.
Log
hashes
chan
common
.
Hash
hashes
chan
[]
common
.
Hash
headers
chan
*
types
.
Header
installed
chan
struct
{}
// closed when the filter is installed
err
chan
error
// closed when the filter is uninstalled
...
...
@@ -95,7 +95,7 @@ type EventSystem struct {
lastHead
*
types
.
Header
// Subscriptions
tx
Sub
event
.
Subscription
// Subscription for new transaction event
tx
sSub
event
.
Subscription
// Subscription for new transaction event
logsSub
event
.
Subscription
// Subscription for new log event
rmLogsSub
event
.
Subscription
// Subscription for removed log event
chainSub
event
.
Subscription
// Subscription for new chain event
...
...
@@ -104,7 +104,7 @@ type EventSystem struct {
// Channels
install
chan
*
subscription
// install filter for event notification
uninstall
chan
*
subscription
// remove filter for event notification
tx
Ch
chan
core
.
TxPreEvent
// Channel to receive new transaction
event
tx
sCh
chan
core
.
TxsPreEvent
// Channel to receive new transactions
event
logsCh
chan
[]
*
types
.
Log
// Channel to receive new log event
rmLogsCh
chan
core
.
RemovedLogsEvent
// Channel to receive removed log event
chainCh
chan
core
.
ChainEvent
// Channel to receive new chain event
...
...
@@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
lightMode
:
lightMode
,
install
:
make
(
chan
*
subscription
),
uninstall
:
make
(
chan
*
subscription
),
tx
Ch
:
make
(
chan
core
.
Tx
PreEvent
,
txChanSize
),
tx
sCh
:
make
(
chan
core
.
Txs
PreEvent
,
txChanSize
),
logsCh
:
make
(
chan
[]
*
types
.
Log
,
logsChanSize
),
rmLogsCh
:
make
(
chan
core
.
RemovedLogsEvent
,
rmLogsChanSize
),
chainCh
:
make
(
chan
core
.
ChainEvent
,
chainEvChanSize
),
}
// Subscribe events
m
.
tx
Sub
=
m
.
backend
.
SubscribeTxPreEvent
(
m
.
tx
Ch
)
m
.
tx
sSub
=
m
.
backend
.
SubscribeTxPreEvent
(
m
.
txs
Ch
)
m
.
logsSub
=
m
.
backend
.
SubscribeLogsEvent
(
m
.
logsCh
)
m
.
rmLogsSub
=
m
.
backend
.
SubscribeRemovedLogsEvent
(
m
.
rmLogsCh
)
m
.
chainSub
=
m
.
backend
.
SubscribeChainEvent
(
m
.
chainCh
)
...
...
@@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m
.
pendingLogSub
=
m
.
mux
.
Subscribe
(
core
.
PendingLogsEvent
{})
// Make sure none of the subscriptions are empty
if
m
.
txSub
==
nil
||
m
.
logsSub
==
nil
||
m
.
rmLogsSub
==
nil
||
m
.
chainSub
==
nil
||
if
m
.
tx
s
Sub
==
nil
||
m
.
logsSub
==
nil
||
m
.
rmLogsSub
==
nil
||
m
.
chainSub
==
nil
||
m
.
pendingLogSub
.
Closed
()
{
log
.
Crit
(
"Subscribe for event system failed"
)
}
...
...
@@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit
:
crit
,
created
:
time
.
Now
(),
logs
:
logs
,
hashes
:
make
(
chan
common
.
Hash
),
hashes
:
make
(
chan
[]
common
.
Hash
),
headers
:
make
(
chan
*
types
.
Header
),
installed
:
make
(
chan
struct
{}),
err
:
make
(
chan
error
),
...
...
@@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit
:
crit
,
created
:
time
.
Now
(),
logs
:
logs
,
hashes
:
make
(
chan
common
.
Hash
),
hashes
:
make
(
chan
[]
common
.
Hash
),
headers
:
make
(
chan
*
types
.
Header
),
installed
:
make
(
chan
struct
{}),
err
:
make
(
chan
error
),
...
...
@@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit
:
crit
,
created
:
time
.
Now
(),
logs
:
logs
,
hashes
:
make
(
chan
common
.
Hash
),
hashes
:
make
(
chan
[]
common
.
Hash
),
headers
:
make
(
chan
*
types
.
Header
),
installed
:
make
(
chan
struct
{}),
err
:
make
(
chan
error
),
...
...
@@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ
:
BlocksSubscription
,
created
:
time
.
Now
(),
logs
:
make
(
chan
[]
*
types
.
Log
),
hashes
:
make
(
chan
common
.
Hash
),
hashes
:
make
(
chan
[]
common
.
Hash
),
headers
:
headers
,
installed
:
make
(
chan
struct
{}),
err
:
make
(
chan
error
),
...
...
@@ -300,7 +300,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// SubscribePendingTxEvents creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func
(
es
*
EventSystem
)
SubscribePendingTxEvents
(
hashes
chan
common
.
Hash
)
*
Subscription
{
func
(
es
*
EventSystem
)
SubscribePendingTxEvents
(
hashes
chan
[]
common
.
Hash
)
*
Subscription
{
sub
:=
&
subscription
{
id
:
rpc
.
NewID
(),
typ
:
PendingTransactionsSubscription
,
...
...
@@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
}
}
}
case
core
.
TxPreEvent
:
case
core
.
TxsPreEvent
:
hashes
:=
make
([]
common
.
Hash
,
0
,
e
.
Txs
.
Len
())
for
_
,
tx
:=
range
e
.
Txs
{
hashes
=
append
(
hashes
,
tx
.
Hash
())
}
for
_
,
f
:=
range
filters
[
PendingTransactionsSubscription
]
{
f
.
hashes
<-
e
.
Tx
.
Hash
()
f
.
hashes
<-
hashes
}
case
core
.
ChainEvent
:
for
_
,
f
:=
range
filters
[
BlocksSubscription
]
{
...
...
@@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer
func
()
{
es
.
pendingLogSub
.
Unsubscribe
()
es
.
txSub
.
Unsubscribe
()
es
.
tx
s
Sub
.
Unsubscribe
()
es
.
logsSub
.
Unsubscribe
()
es
.
rmLogsSub
.
Unsubscribe
()
es
.
chainSub
.
Unsubscribe
()
...
...
@@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() {
for
{
select
{
// Handle subscribed events
case
ev
:=
<-
es
.
txCh
:
case
ev
:=
<-
es
.
tx
s
Ch
:
es
.
broadcast
(
index
,
ev
)
case
ev
:=
<-
es
.
logsCh
:
es
.
broadcast
(
index
,
ev
)
...
...
@@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() {
close
(
f
.
err
)
// System stopped
case
<-
es
.
txSub
.
Err
()
:
case
<-
es
.
tx
s
Sub
.
Err
()
:
return
case
<-
es
.
logsSub
.
Err
()
:
return
...
...
This diff is collapsed.
Click to expand it.
eth/filters/filter_system_test.go
View file @
a2e43d28
...
...
@@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types
return
logs
,
nil
}
func
(
b
*
testBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
{
func
(
b
*
testBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
{
return
b
.
txFeed
.
Subscribe
(
ch
)
}
...
...
@@ -232,10 +232,7 @@ func TestPendingTxFilter(t *testing.T) {
fid0
:=
api
.
NewPendingTransactionFilter
()
time
.
Sleep
(
1
*
time
.
Second
)
for
_
,
tx
:=
range
transactions
{
ev
:=
core
.
TxPreEvent
{
Tx
:
tx
}
txFeed
.
Send
(
ev
)
}
txFeed
.
Send
(
core
.
TxsPreEvent
{
transactions
})
timeout
:=
time
.
Now
()
.
Add
(
1
*
time
.
Second
)
for
{
...
...
This diff is collapsed.
Click to expand it.
eth/handler.go
View file @
a2e43d28
...
...
@@ -46,7 +46,7 @@ const (
softResponseLimit
=
2
*
1024
*
1024
// Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize
=
500
// Approximate size of an RLP encoded block header
// txChanSize is the size of channel listening to TxPreEvent.
// txChanSize is the size of channel listening to Tx
s
PreEvent.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
)
...
...
@@ -81,8 +81,8 @@ type ProtocolManager struct {
SubProtocols
[]
p2p
.
Protocol
eventMux
*
event
.
TypeMux
tx
Ch
chan
core
.
Tx
PreEvent
tx
Sub
event
.
Subscription
tx
sCh
chan
core
.
Txs
PreEvent
tx
sSub
event
.
Subscription
minedBlockSub
*
event
.
TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
...
...
@@ -204,8 +204,8 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm
.
maxPeers
=
maxPeers
// broadcast transactions
pm
.
tx
Ch
=
make
(
chan
core
.
Tx
PreEvent
,
txChanSize
)
pm
.
tx
Sub
=
pm
.
txpool
.
SubscribeTxPreEvent
(
pm
.
tx
Ch
)
pm
.
tx
sCh
=
make
(
chan
core
.
Txs
PreEvent
,
txChanSize
)
pm
.
tx
sSub
=
pm
.
txpool
.
SubscribeTxPreEvent
(
pm
.
txs
Ch
)
go
pm
.
txBroadcastLoop
()
// broadcast mined blocks
...
...
@@ -220,7 +220,7 @@ func (pm *ProtocolManager) Start(maxPeers int) {
func
(
pm
*
ProtocolManager
)
Stop
()
{
log
.
Info
(
"Stopping Ethereum protocol"
)
pm
.
tx
Sub
.
Unsubscribe
()
// quits txBroadcastLoop
pm
.
tx
sSub
.
Unsubscribe
()
// quits txBroadcastLoop
pm
.
minedBlockSub
.
Unsubscribe
()
// quits blockBroadcastLoop
// Quit the sync loop.
...
...
@@ -712,16 +712,23 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
}
}
// BroadcastTx
will propagate a transaction
to all peers which are not known to
// BroadcastTx
s will propagate a batch of transactions
to all peers which are not known to
// already have the given transaction.
func
(
pm
*
ProtocolManager
)
BroadcastTx
(
hash
common
.
Hash
,
tx
*
types
.
Transaction
)
{
// Broadcast transaction to a batch of peers not knowing about it
peers
:=
pm
.
peers
.
PeersWithoutTx
(
hash
)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for
_
,
peer
:=
range
peers
{
peer
.
SendTransactions
(
types
.
Transactions
{
tx
})
func
(
pm
*
ProtocolManager
)
BroadcastTxs
(
txs
types
.
Transactions
)
{
var
txset
=
make
(
map
[
*
peer
]
types
.
Transactions
)
// Broadcast transactions to a batch of peers not knowing about it
for
_
,
tx
:=
range
txs
{
peers
:=
pm
.
peers
.
PeersWithoutTx
(
tx
.
Hash
())
for
_
,
peer
:=
range
peers
{
txset
[
peer
]
=
append
(
txset
[
peer
],
tx
)
}
log
.
Trace
(
"Broadcast transaction"
,
"hash"
,
tx
.
Hash
(),
"recipients"
,
len
(
peers
))
}
// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for
peer
,
txs
:=
range
txset
{
peer
.
SendTransactions
(
txs
)
}
log
.
Trace
(
"Broadcast transaction"
,
"hash"
,
hash
,
"recipients"
,
len
(
peers
))
}
// Mined broadcast loop
...
...
@@ -739,11 +746,11 @@ func (pm *ProtocolManager) minedBroadcastLoop() {
func
(
pm
*
ProtocolManager
)
txBroadcastLoop
()
{
for
{
select
{
case
event
:=
<-
pm
.
txCh
:
pm
.
BroadcastTx
(
event
.
Tx
.
Hash
(),
event
.
Tx
)
case
event
:=
<-
pm
.
tx
s
Ch
:
pm
.
BroadcastTx
s
(
event
.
Txs
)
// Err() channel will be closed when unsubscribing.
case
<-
pm
.
txSub
.
Err
()
:
case
<-
pm
.
tx
s
Sub
.
Err
()
:
return
}
}
...
...
This diff is collapsed.
Click to expand it.
eth/helper_test.go
View file @
a2e43d28
...
...
@@ -124,7 +124,7 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
return
batches
,
nil
}
func
(
p
*
testTxPool
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
{
func
(
p
*
testTxPool
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
{
return
p
.
txFeed
.
Subscribe
(
ch
)
}
...
...
This diff is collapsed.
Click to expand it.
eth/protocol.go
View file @
a2e43d28
...
...
@@ -104,8 +104,8 @@ type txPool interface {
Pending
()
(
map
[
common
.
Address
]
types
.
Transactions
,
error
)
// SubscribeTxPreEvent should return an event subscription of
// TxPreEvent and send events to the given channel.
SubscribeTxPreEvent
(
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
// Tx
s
PreEvent and send events to the given channel.
SubscribeTxPreEvent
(
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
}
// statusData is the network packet for the status message.
...
...
This diff is collapsed.
Click to expand it.
eth/protocol_test.go
View file @
a2e43d28
...
...
@@ -116,7 +116,7 @@ func testRecvTransactions(t *testing.T, protocol int) {
t
.
Errorf
(
"added wrong tx hash: got %v, want %v"
,
added
[
0
]
.
Hash
(),
tx
.
Hash
())
}
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Errorf
(
"no TxPreEvent received within 2 seconds"
)
t
.
Errorf
(
"no Tx
s
PreEvent received within 2 seconds"
)
}
}
...
...
This diff is collapsed.
Click to expand it.
ethstats/ethstats.go
View file @
a2e43d28
...
...
@@ -49,7 +49,7 @@ const (
// history request.
historyUpdateRange
=
50
// txChanSize is the size of channel listening to TxPreEvent.
// txChanSize is the size of channel listening to Tx
s
PreEvent.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
...
...
@@ -58,8 +58,8 @@ const (
type
txPool
interface
{
// SubscribeTxPreEvent should return an event subscription of
// TxPreEvent and send events to the given channel.
SubscribeTxPreEvent
(
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
// Tx
s
PreEvent and send events to the given channel.
SubscribeTxPreEvent
(
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
}
type
blockChain
interface
{
...
...
@@ -150,7 +150,7 @@ func (s *Service) loop() {
headSub
:=
blockchain
.
SubscribeChainHeadEvent
(
chainHeadCh
)
defer
headSub
.
Unsubscribe
()
txEventCh
:=
make
(
chan
core
.
TxPreEvent
,
txChanSize
)
txEventCh
:=
make
(
chan
core
.
Tx
s
PreEvent
,
txChanSize
)
txSub
:=
txpool
.
SubscribeTxPreEvent
(
txEventCh
)
defer
txSub
.
Unsubscribe
()
...
...
This diff is collapsed.
Click to expand it.
internal/ethapi/backend.go
View file @
a2e43d28
...
...
@@ -65,7 +65,7 @@ type Backend interface {
GetPoolNonce
(
ctx
context
.
Context
,
addr
common
.
Address
)
(
uint64
,
error
)
Stats
()
(
pending
int
,
queued
int
)
TxPoolContent
()
(
map
[
common
.
Address
]
types
.
Transactions
,
map
[
common
.
Address
]
types
.
Transactions
)
SubscribeTxPreEvent
(
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
SubscribeTxPreEvent
(
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
ChainConfig
()
*
params
.
ChainConfig
CurrentBlock
()
*
types
.
Block
...
...
This diff is collapsed.
Click to expand it.
les/api_backend.go
View file @
a2e43d28
...
...
@@ -136,7 +136,7 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
return
b
.
eth
.
txPool
.
Content
()
}
func
(
b
*
LesApiBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
{
func
(
b
*
LesApiBackend
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
{
return
b
.
eth
.
txPool
.
SubscribeTxPreEvent
(
ch
)
}
...
...
This diff is collapsed.
Click to expand it.
light/txpool.go
View file @
a2e43d28
...
...
@@ -321,9 +321,9 @@ func (pool *TxPool) Stop() {
log
.
Info
(
"Transaction pool stopped"
)
}
// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and
// SubscribeTxPreEvent registers a subscription of core.Tx
s
PreEvent and
// starts sending event to the given channel.
func
(
pool
*
TxPool
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
TxPreEvent
)
event
.
Subscription
{
func
(
pool
*
TxPool
)
SubscribeTxPreEvent
(
ch
chan
<-
core
.
Tx
s
PreEvent
)
event
.
Subscription
{
return
pool
.
scope
.
Track
(
pool
.
txFeed
.
Subscribe
(
ch
))
}
...
...
@@ -412,7 +412,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
go
self
.
txFeed
.
Send
(
core
.
Tx
PreEvent
{
Tx
:
tx
})
go
self
.
txFeed
.
Send
(
core
.
Tx
sPreEvent
{
types
.
Transactions
{
tx
}
})
}
// Print a log message if low enough level is set
...
...
This diff is collapsed.
Click to expand it.
miner/worker.go
View file @
a2e43d28
...
...
@@ -42,7 +42,7 @@ const (
resultQueueSize
=
10
miningLogAtDepth
=
5
// txChanSize is the size of channel listening to TxPreEvent.
// txChanSize is the size of channel listening to Tx
s
PreEvent.
// The number is referenced from the size of tx pool.
txChanSize
=
4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
...
...
@@ -71,6 +71,7 @@ type Work struct {
family
*
set
.
Set
// family set (used for checking uncle invalidity)
uncles
*
set
.
Set
// uncle set
tcount
int
// tx count in cycle
gasPool
*
core
.
GasPool
// available gas used to pack transaction.
Block
*
types
.
Block
// the new block
...
...
@@ -95,8 +96,8 @@ type worker struct {
// update loop
mux
*
event
.
TypeMux
tx
Ch
chan
core
.
Tx
PreEvent
tx
Sub
event
.
Subscription
tx
sCh
chan
core
.
Txs
PreEvent
tx
sSub
event
.
Subscription
chainHeadCh
chan
core
.
ChainHeadEvent
chainHeadSub
event
.
Subscription
chainSideCh
chan
core
.
ChainSideEvent
...
...
@@ -137,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
engine
:
engine
,
eth
:
eth
,
mux
:
mux
,
tx
Ch
:
make
(
chan
core
.
Tx
PreEvent
,
txChanSize
),
tx
sCh
:
make
(
chan
core
.
Txs
PreEvent
,
txChanSize
),
chainHeadCh
:
make
(
chan
core
.
ChainHeadEvent
,
chainHeadChanSize
),
chainSideCh
:
make
(
chan
core
.
ChainSideEvent
,
chainSideChanSize
),
chainDb
:
eth
.
ChainDb
(),
...
...
@@ -149,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
agents
:
make
(
map
[
Agent
]
struct
{}),
unconfirmed
:
newUnconfirmedBlocks
(
eth
.
BlockChain
(),
miningLogAtDepth
),
}
// Subscribe TxPreEvent for tx pool
worker
.
tx
Sub
=
eth
.
TxPool
()
.
SubscribeTxPreEvent
(
worker
.
tx
Ch
)
// Subscribe Tx
s
PreEvent for tx pool
worker
.
tx
sSub
=
eth
.
TxPool
()
.
SubscribeTxPreEvent
(
worker
.
txs
Ch
)
// Subscribe events for blockchain
worker
.
chainHeadSub
=
eth
.
BlockChain
()
.
SubscribeChainHeadEvent
(
worker
.
chainHeadCh
)
worker
.
chainSideSub
=
eth
.
BlockChain
()
.
SubscribeChainSideEvent
(
worker
.
chainSideCh
)
...
...
@@ -241,7 +242,7 @@ func (self *worker) unregister(agent Agent) {
}
func
(
self
*
worker
)
update
()
{
defer
self
.
txSub
.
Unsubscribe
()
defer
self
.
tx
s
Sub
.
Unsubscribe
()
defer
self
.
chainHeadSub
.
Unsubscribe
()
defer
self
.
chainSideSub
.
Unsubscribe
()
...
...
@@ -258,15 +259,21 @@ func (self *worker) update() {
self
.
possibleUncles
[
ev
.
Block
.
Hash
()]
=
ev
.
Block
self
.
uncleMu
.
Unlock
()
// Handle TxPreEvent
case
ev
:=
<-
self
.
txCh
:
// Apply transaction to the pending state if we're not mining
// Handle TxsPreEvent
case
ev
:=
<-
self
.
txsCh
:
// Apply transactions to the pending state if we're not mining.
//
// Note all transactions received may not be continuous with transactions
// already included in the current mining block. These transactions will
// be automatically eliminated.
if
atomic
.
LoadInt32
(
&
self
.
mining
)
==
0
{
self
.
currentMu
.
Lock
()
acc
,
_
:=
types
.
Sender
(
self
.
current
.
signer
,
ev
.
Tx
)
txs
:=
map
[
common
.
Address
]
types
.
Transactions
{
acc
:
{
ev
.
Tx
}}
txs
:=
make
(
map
[
common
.
Address
]
types
.
Transactions
)
for
_
,
tx
:=
range
ev
.
Txs
{
acc
,
_
:=
types
.
Sender
(
self
.
current
.
signer
,
tx
)
txs
[
acc
]
=
append
(
txs
[
acc
],
tx
)
}
txset
:=
types
.
NewTransactionsByPriceAndNonce
(
self
.
current
.
signer
,
txs
)
self
.
current
.
commitTransactions
(
self
.
mux
,
txset
,
self
.
chain
,
self
.
coinbase
)
self
.
updateSnapshot
()
self
.
currentMu
.
Unlock
()
...
...
@@ -278,7 +285,7 @@ func (self *worker) update() {
}
// System stopped
case
<-
self
.
txSub
.
Err
()
:
case
<-
self
.
tx
s
Sub
.
Err
()
:
return
case
<-
self
.
chainHeadSub
.
Err
()
:
return
...
...
@@ -522,14 +529,16 @@ func (self *worker) updateSnapshot() {
}
func
(
env
*
Work
)
commitTransactions
(
mux
*
event
.
TypeMux
,
txs
*
types
.
TransactionsByPriceAndNonce
,
bc
*
core
.
BlockChain
,
coinbase
common
.
Address
)
{
gp
:=
new
(
core
.
GasPool
)
.
AddGas
(
env
.
header
.
GasLimit
)
if
env
.
gasPool
==
nil
{
env
.
gasPool
=
new
(
core
.
GasPool
)
.
AddGas
(
env
.
header
.
GasLimit
)
}
var
coalescedLogs
[]
*
types
.
Log
for
{
// If we don't have enough gas for any further transactions then we're done
if
gp
.
Gas
()
<
params
.
TxGas
{
log
.
Trace
(
"Not enough gas for further transactions"
,
"gp"
,
gp
)
if
env
.
gasPool
.
Gas
()
<
params
.
TxGas
{
log
.
Trace
(
"Not enough gas for further transactions"
,
"gp"
,
env
.
gasPool
)
break
}
// Retrieve the next transaction and abort if all done
...
...
@@ -553,7 +562,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
// Start executing the transaction
env
.
state
.
Prepare
(
tx
.
Hash
(),
common
.
Hash
{},
env
.
tcount
)
err
,
logs
:=
env
.
commitTransaction
(
tx
,
bc
,
coinbase
,
gp
)
err
,
logs
:=
env
.
commitTransaction
(
tx
,
bc
,
coinbase
,
env
.
gasPool
)
switch
err
{
case
core
.
ErrGasLimitReached
:
// Pop the current out-of-gas transaction without shifting in the next from the account
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment