Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
0db0b277
Unverified
Commit
0db0b277
authored
Aug 24, 2021
by
Péter Szilágyi
Committed by
GitHub
Aug 24, 2021
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "Revert "eth: drop eth/65, the last non-reqid protocol version" (#23426)" (#23456)
This reverts commit
c368f728
.
parent
d705f5a5
Changes
15
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
179 additions
and
609 deletions
+179
-609
suite_test.go
cmd/devp2p/internal/ethtest/suite_test.go
+1
-1
downloader.go
eth/downloader/downloader.go
+2
-2
downloader_test.go
eth/downloader/downloader_test.go
+2
-93
peer.go
eth/downloader/peer.go
+4
-4
handler.go
eth/handler.go
+1
-4
handler_eth_test.go
eth/handler_eth_test.go
+21
-17
handler.go
eth/protocols/eth/handler.go
+15
-32
handler_test.go
eth/protocols/eth/handler_test.go
+47
-87
handlers.go
eth/protocols/eth/handlers.go
+0
-107
handshake_test.go
eth/protocols/eth/handshake_test.go
+0
-1
peer.go
eth/protocols/eth/peer.go
+66
-142
protocol.go
eth/protocols/eth/protocol.go
+15
-19
sync.go
eth/sync.go
+4
-98
sync_test.go
eth/sync_test.go
+0
-1
client_handler.go
les/client_handler.go
+1
-1
No files found.
cmd/devp2p/internal/ethtest/suite_test.go
View file @
0db0b277
...
@@ -45,7 +45,7 @@ func TestEthSuite(t *testing.T) {
...
@@ -45,7 +45,7 @@ func TestEthSuite(t *testing.T) {
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatalf
(
"could not create new test suite: %v"
,
err
)
t
.
Fatalf
(
"could not create new test suite: %v"
,
err
)
}
}
for
_
,
test
:=
range
suite
.
AllEth
Tests
()
{
for
_
,
test
:=
range
suite
.
Eth66
Tests
()
{
t
.
Run
(
test
.
Name
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
test
.
Name
,
func
(
t
*
testing
.
T
)
{
result
:=
utesting
.
RunTAP
([]
utesting
.
Test
{{
Name
:
test
.
Name
,
Fn
:
test
.
Fn
}},
os
.
Stdout
)
result
:=
utesting
.
RunTAP
([]
utesting
.
Test
{{
Name
:
test
.
Name
,
Fn
:
test
.
Fn
}},
os
.
Stdout
)
if
result
[
0
]
.
Failed
{
if
result
[
0
]
.
Failed
{
...
...
eth/downloader/downloader.go
View file @
0db0b277
...
@@ -448,8 +448,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
...
@@ -448,8 +448,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d
.
mux
.
Post
(
DoneEvent
{
latest
})
d
.
mux
.
Post
(
DoneEvent
{
latest
})
}
}
}()
}()
if
p
.
version
<
eth
.
ETH6
5
{
if
p
.
version
<
eth
.
ETH6
6
{
return
fmt
.
Errorf
(
"%w: advertized %d < required %d"
,
errTooOld
,
p
.
version
,
eth
.
ETH6
5
)
return
fmt
.
Errorf
(
"%w: advertized %d < required %d"
,
errTooOld
,
p
.
version
,
eth
.
ETH6
6
)
}
}
mode
:=
d
.
getMode
()
mode
:=
d
.
getMode
()
...
...
eth/downloader/downloader_test.go
View file @
0db0b277
This diff is collapsed.
Click to expand it.
eth/downloader/peer.go
View file @
0db0b277
...
@@ -413,7 +413,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
...
@@ -413,7 +413,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
throughput
:=
func
(
p
*
peerConnection
)
int
{
throughput
:=
func
(
p
*
peerConnection
)
int
{
return
p
.
rates
.
Capacity
(
eth
.
BlockHeadersMsg
,
time
.
Second
)
return
p
.
rates
.
Capacity
(
eth
.
BlockHeadersMsg
,
time
.
Second
)
}
}
return
ps
.
idlePeers
(
eth
.
ETH6
5
,
eth
.
ETH66
,
idle
,
throughput
)
return
ps
.
idlePeers
(
eth
.
ETH6
6
,
eth
.
ETH66
,
idle
,
throughput
)
}
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
...
@@ -425,7 +425,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
...
@@ -425,7 +425,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
throughput
:=
func
(
p
*
peerConnection
)
int
{
throughput
:=
func
(
p
*
peerConnection
)
int
{
return
p
.
rates
.
Capacity
(
eth
.
BlockBodiesMsg
,
time
.
Second
)
return
p
.
rates
.
Capacity
(
eth
.
BlockBodiesMsg
,
time
.
Second
)
}
}
return
ps
.
idlePeers
(
eth
.
ETH6
5
,
eth
.
ETH66
,
idle
,
throughput
)
return
ps
.
idlePeers
(
eth
.
ETH6
6
,
eth
.
ETH66
,
idle
,
throughput
)
}
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
...
@@ -437,7 +437,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
...
@@ -437,7 +437,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
throughput
:=
func
(
p
*
peerConnection
)
int
{
throughput
:=
func
(
p
*
peerConnection
)
int
{
return
p
.
rates
.
Capacity
(
eth
.
ReceiptsMsg
,
time
.
Second
)
return
p
.
rates
.
Capacity
(
eth
.
ReceiptsMsg
,
time
.
Second
)
}
}
return
ps
.
idlePeers
(
eth
.
ETH6
5
,
eth
.
ETH66
,
idle
,
throughput
)
return
ps
.
idlePeers
(
eth
.
ETH6
6
,
eth
.
ETH66
,
idle
,
throughput
)
}
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
...
@@ -449,7 +449,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
...
@@ -449,7 +449,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
throughput
:=
func
(
p
*
peerConnection
)
int
{
throughput
:=
func
(
p
*
peerConnection
)
int
{
return
p
.
rates
.
Capacity
(
eth
.
NodeDataMsg
,
time
.
Second
)
return
p
.
rates
.
Capacity
(
eth
.
NodeDataMsg
,
time
.
Second
)
}
}
return
ps
.
idlePeers
(
eth
.
ETH6
5
,
eth
.
ETH66
,
idle
,
throughput
)
return
ps
.
idlePeers
(
eth
.
ETH6
6
,
eth
.
ETH66
,
idle
,
throughput
)
}
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// idlePeers retrieves a flat list of all currently idle peers satisfying the
...
...
eth/handler.go
View file @
0db0b277
...
@@ -117,7 +117,6 @@ type handler struct {
...
@@ -117,7 +117,6 @@ type handler struct {
whitelist
map
[
uint64
]
common
.
Hash
whitelist
map
[
uint64
]
common
.
Hash
// channels for fetcher, syncer, txsyncLoop
// channels for fetcher, syncer, txsyncLoop
txsyncCh
chan
*
txsync
quitSync
chan
struct
{}
quitSync
chan
struct
{}
chainSync
*
chainSyncer
chainSync
*
chainSyncer
...
@@ -140,7 +139,6 @@ func newHandler(config *handlerConfig) (*handler, error) {
...
@@ -140,7 +139,6 @@ func newHandler(config *handlerConfig) (*handler, error) {
chain
:
config
.
Chain
,
chain
:
config
.
Chain
,
peers
:
newPeerSet
(),
peers
:
newPeerSet
(),
whitelist
:
config
.
Whitelist
,
whitelist
:
config
.
Whitelist
,
txsyncCh
:
make
(
chan
*
txsync
),
quitSync
:
make
(
chan
struct
{}),
quitSync
:
make
(
chan
struct
{}),
}
}
if
config
.
Sync
==
downloader
.
FullSync
{
if
config
.
Sync
==
downloader
.
FullSync
{
...
@@ -408,9 +406,8 @@ func (h *handler) Start(maxPeers int) {
...
@@ -408,9 +406,8 @@ func (h *handler) Start(maxPeers int) {
go
h
.
minedBroadcastLoop
()
go
h
.
minedBroadcastLoop
()
// start sync handlers
// start sync handlers
h
.
wg
.
Add
(
2
)
h
.
wg
.
Add
(
1
)
go
h
.
chainSync
.
loop
()
go
h
.
chainSync
.
loop
()
go
h
.
txsyncLoop64
()
// TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}
}
func
(
h
*
handler
)
Stop
()
{
func
(
h
*
handler
)
Stop
()
{
...
...
eth/handler_eth_test.go
View file @
0db0b277
...
@@ -80,7 +80,6 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
...
@@ -80,7 +80,6 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// Tests that peers are correctly accepted (or rejected) based on the advertised
// Tests that peers are correctly accepted (or rejected) based on the advertised
// fork IDs in the protocol handshake.
// fork IDs in the protocol handshake.
func
TestForkIDSplit65
(
t
*
testing
.
T
)
{
testForkIDSplit
(
t
,
eth
.
ETH65
)
}
func
TestForkIDSplit66
(
t
*
testing
.
T
)
{
testForkIDSplit
(
t
,
eth
.
ETH66
)
}
func
TestForkIDSplit66
(
t
*
testing
.
T
)
{
testForkIDSplit
(
t
,
eth
.
ETH66
)
}
func
testForkIDSplit
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testForkIDSplit
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -236,7 +235,6 @@ func testForkIDSplit(t *testing.T, protocol uint) {
...
@@ -236,7 +235,6 @@ func testForkIDSplit(t *testing.T, protocol uint) {
}
}
// Tests that received transactions are added to the local pool.
// Tests that received transactions are added to the local pool.
func
TestRecvTransactions65
(
t
*
testing
.
T
)
{
testRecvTransactions
(
t
,
eth
.
ETH65
)
}
func
TestRecvTransactions66
(
t
*
testing
.
T
)
{
testRecvTransactions
(
t
,
eth
.
ETH66
)
}
func
TestRecvTransactions66
(
t
*
testing
.
T
)
{
testRecvTransactions
(
t
,
eth
.
ETH66
)
}
func
testRecvTransactions
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testRecvTransactions
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -294,7 +292,6 @@ func testRecvTransactions(t *testing.T, protocol uint) {
...
@@ -294,7 +292,6 @@ func testRecvTransactions(t *testing.T, protocol uint) {
}
}
// This test checks that pending transactions are sent.
// This test checks that pending transactions are sent.
func
TestSendTransactions65
(
t
*
testing
.
T
)
{
testSendTransactions
(
t
,
eth
.
ETH65
)
}
func
TestSendTransactions66
(
t
*
testing
.
T
)
{
testSendTransactions
(
t
,
eth
.
ETH66
)
}
func
TestSendTransactions66
(
t
*
testing
.
T
)
{
testSendTransactions
(
t
,
eth
.
ETH66
)
}
func
testSendTransactions
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testSendTransactions
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -306,7 +303,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
...
@@ -306,7 +303,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
insert
:=
make
([]
*
types
.
Transaction
,
100
)
insert
:=
make
([]
*
types
.
Transaction
,
100
)
for
nonce
:=
range
insert
{
for
nonce
:=
range
insert
{
tx
:=
types
.
NewTransaction
(
uint64
(
nonce
),
common
.
Address
{},
big
.
NewInt
(
0
),
100000
,
big
.
NewInt
(
0
),
make
([]
byte
,
txsyncPackSize
/
1
0
))
tx
:=
types
.
NewTransaction
(
uint64
(
nonce
),
common
.
Address
{},
big
.
NewInt
(
0
),
100000
,
big
.
NewInt
(
0
),
make
([]
byte
,
1024
0
))
tx
,
_
=
types
.
SignTx
(
tx
,
types
.
HomesteadSigner
{},
testKey
)
tx
,
_
=
types
.
SignTx
(
tx
,
types
.
HomesteadSigner
{},
testKey
)
insert
[
nonce
]
=
tx
insert
[
nonce
]
=
tx
...
@@ -380,7 +377,6 @@ func testSendTransactions(t *testing.T, protocol uint) {
...
@@ -380,7 +377,6 @@ func testSendTransactions(t *testing.T, protocol uint) {
// Tests that transactions get propagated to all attached peers, either via direct
// Tests that transactions get propagated to all attached peers, either via direct
// broadcasts or via announcements/retrievals.
// broadcasts or via announcements/retrievals.
func
TestTransactionPropagation65
(
t
*
testing
.
T
)
{
testTransactionPropagation
(
t
,
eth
.
ETH65
)
}
func
TestTransactionPropagation66
(
t
*
testing
.
T
)
{
testTransactionPropagation
(
t
,
eth
.
ETH66
)
}
func
TestTransactionPropagation66
(
t
*
testing
.
T
)
{
testTransactionPropagation
(
t
,
eth
.
ETH66
)
}
func
testTransactionPropagation
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testTransactionPropagation
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -521,8 +517,8 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
...
@@ -521,8 +517,8 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
defer
p2pLocal
.
Close
()
defer
p2pLocal
.
Close
()
defer
p2pRemote
.
Close
()
defer
p2pRemote
.
Close
()
local
:=
eth
.
NewPeer
(
eth
.
ETH6
5
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
1
},
""
,
nil
,
p2pLocal
),
p2pLocal
,
handler
.
txpool
)
local
:=
eth
.
NewPeer
(
eth
.
ETH6
6
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
1
},
""
,
nil
,
p2pLocal
),
p2pLocal
,
handler
.
txpool
)
remote
:=
eth
.
NewPeer
(
eth
.
ETH6
5
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
2
},
""
,
nil
,
p2pRemote
),
p2pRemote
,
handler
.
txpool
)
remote
:=
eth
.
NewPeer
(
eth
.
ETH6
6
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
2
},
""
,
nil
,
p2pRemote
),
p2pRemote
,
handler
.
txpool
)
defer
local
.
Close
()
defer
local
.
Close
()
defer
remote
.
Close
()
defer
remote
.
Close
()
...
@@ -543,30 +539,39 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
...
@@ -543,30 +539,39 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if
err
:=
remote
.
Handshake
(
1
,
td
,
head
.
Hash
(),
genesis
.
Hash
(),
forkid
.
NewIDWithChain
(
handler
.
chain
),
forkid
.
NewFilter
(
handler
.
chain
));
err
!=
nil
{
if
err
:=
remote
.
Handshake
(
1
,
td
,
head
.
Hash
(),
genesis
.
Hash
(),
forkid
.
NewIDWithChain
(
handler
.
chain
),
forkid
.
NewFilter
(
handler
.
chain
));
err
!=
nil
{
t
.
Fatalf
(
"failed to run protocol handshake"
)
t
.
Fatalf
(
"failed to run protocol handshake"
)
}
}
// Connect a new peer and check that we receive the checkpoint challenge.
// Connect a new peer and check that we receive the checkpoint challenge.
if
checkpoint
{
if
checkpoint
{
if
err
:=
remote
.
ExpectRequestHeadersByNumber
(
response
.
Number
.
Uint64
(),
1
,
0
,
false
);
err
!=
nil
{
msg
,
err
:=
p2pRemote
.
ReadMsg
()
t
.
Fatalf
(
"challenge mismatch: %v"
,
err
)
if
err
!=
nil
{
t
.
Fatalf
(
"failed to read checkpoint challenge: %v"
,
err
)
}
request
:=
new
(
eth
.
GetBlockHeadersPacket66
)
if
err
:=
msg
.
Decode
(
request
);
err
!=
nil
{
t
.
Fatalf
(
"failed to decode checkpoint challenge: %v"
,
err
)
}
query
:=
request
.
GetBlockHeadersPacket
if
query
.
Origin
.
Number
!=
response
.
Number
.
Uint64
()
||
query
.
Amount
!=
1
||
query
.
Skip
!=
0
||
query
.
Reverse
{
t
.
Fatalf
(
"challenge mismatch: have [%d, %d, %d, %v] want [%d, %d, %d, %v]"
,
query
.
Origin
.
Number
,
query
.
Amount
,
query
.
Skip
,
query
.
Reverse
,
response
.
Number
.
Uint64
(),
1
,
0
,
false
)
}
}
// Create a block to reply to the challenge if no timeout is simulated.
// Create a block to reply to the challenge if no timeout is simulated.
if
!
timeout
{
if
!
timeout
{
if
empty
{
if
empty
{
if
err
:=
remote
.
SendBlockHeaders
(
[]
*
types
.
Header
{});
err
!=
nil
{
if
err
:=
remote
.
ReplyBlockHeaders
(
request
.
RequestId
,
[]
*
types
.
Header
{});
err
!=
nil
{
t
.
Fatalf
(
"failed to answer challenge: %v"
,
err
)
t
.
Fatalf
(
"failed to answer challenge: %v"
,
err
)
}
}
}
else
if
match
{
}
else
if
match
{
if
err
:=
remote
.
SendBlockHeaders
(
[]
*
types
.
Header
{
response
});
err
!=
nil
{
if
err
:=
remote
.
ReplyBlockHeaders
(
request
.
RequestId
,
[]
*
types
.
Header
{
response
});
err
!=
nil
{
t
.
Fatalf
(
"failed to answer challenge: %v"
,
err
)
t
.
Fatalf
(
"failed to answer challenge: %v"
,
err
)
}
}
}
else
{
}
else
{
if
err
:=
remote
.
SendBlockHeaders
(
[]
*
types
.
Header
{{
Number
:
response
.
Number
}});
err
!=
nil
{
if
err
:=
remote
.
ReplyBlockHeaders
(
request
.
RequestId
,
[]
*
types
.
Header
{{
Number
:
response
.
Number
}});
err
!=
nil
{
t
.
Fatalf
(
"failed to answer challenge: %v"
,
err
)
t
.
Fatalf
(
"failed to answer challenge: %v"
,
err
)
}
}
}
}
}
}
}
}
// Wait until the test timeout passes to ensure proper cleanup
// Wait until the test timeout passes to ensure proper cleanup
time
.
Sleep
(
syncChallengeTimeout
+
300
*
time
.
Millisecond
)
time
.
Sleep
(
syncChallengeTimeout
+
300
*
time
.
Millisecond
)
...
@@ -619,8 +624,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
...
@@ -619,8 +624,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
defer
sourcePipe
.
Close
()
defer
sourcePipe
.
Close
()
defer
sinkPipe
.
Close
()
defer
sinkPipe
.
Close
()
sourcePeer
:=
eth
.
NewPeer
(
eth
.
ETH6
5
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
byte
(
i
)},
""
,
nil
,
sourcePipe
),
sourcePipe
,
nil
)
sourcePeer
:=
eth
.
NewPeer
(
eth
.
ETH6
6
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
byte
(
i
)},
""
,
nil
,
sourcePipe
),
sourcePipe
,
nil
)
sinkPeer
:=
eth
.
NewPeer
(
eth
.
ETH6
5
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
0
},
""
,
nil
,
sinkPipe
),
sinkPipe
,
nil
)
sinkPeer
:=
eth
.
NewPeer
(
eth
.
ETH6
6
,
p2p
.
NewPeerPipe
(
enode
.
ID
{
0
},
""
,
nil
,
sinkPipe
),
sinkPipe
,
nil
)
defer
sourcePeer
.
Close
()
defer
sourcePeer
.
Close
()
defer
sinkPeer
.
Close
()
defer
sinkPeer
.
Close
()
...
@@ -671,7 +676,6 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
...
@@ -671,7 +676,6 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
// Tests that a propagated malformed block (uncles or transactions don't match
// Tests that a propagated malformed block (uncles or transactions don't match
// with the hashes in the header) gets discarded and not broadcast forward.
// with the hashes in the header) gets discarded and not broadcast forward.
func
TestBroadcastMalformedBlock65
(
t
*
testing
.
T
)
{
testBroadcastMalformedBlock
(
t
,
eth
.
ETH65
)
}
func
TestBroadcastMalformedBlock66
(
t
*
testing
.
T
)
{
testBroadcastMalformedBlock
(
t
,
eth
.
ETH66
)
}
func
TestBroadcastMalformedBlock66
(
t
*
testing
.
T
)
{
testBroadcastMalformedBlock
(
t
,
eth
.
ETH66
)
}
func
testBroadcastMalformedBlock
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testBroadcastMalformedBlock
(
t
*
testing
.
T
,
protocol
uint
)
{
...
...
eth/protocols/eth/handler.go
View file @
0db0b277
...
@@ -171,29 +171,11 @@ type Decoder interface {
...
@@ -171,29 +171,11 @@ type Decoder interface {
Time
()
time
.
Time
Time
()
time
.
Time
}
}
var
eth65
=
map
[
uint64
]
msgHandler
{
GetBlockHeadersMsg
:
handleGetBlockHeaders
,
BlockHeadersMsg
:
handleBlockHeaders
,
GetBlockBodiesMsg
:
handleGetBlockBodies
,
BlockBodiesMsg
:
handleBlockBodies
,
GetNodeDataMsg
:
handleGetNodeData
,
NodeDataMsg
:
handleNodeData
,
GetReceiptsMsg
:
handleGetReceipts
,
ReceiptsMsg
:
handleReceipts
,
NewBlockHashesMsg
:
handleNewBlockhashes
,
NewBlockMsg
:
handleNewBlock
,
TransactionsMsg
:
handleTransactions
,
NewPooledTransactionHashesMsg
:
handleNewPooledTransactionHashes
,
GetPooledTransactionsMsg
:
handleGetPooledTransactions
,
PooledTransactionsMsg
:
handlePooledTransactions
,
}
var
eth66
=
map
[
uint64
]
msgHandler
{
var
eth66
=
map
[
uint64
]
msgHandler
{
NewBlockHashesMsg
:
handleNewBlockhashes
,
NewBlockHashesMsg
:
handleNewBlockhashes
,
NewBlockMsg
:
handleNewBlock
,
NewBlockMsg
:
handleNewBlock
,
TransactionsMsg
:
handleTransactions
,
TransactionsMsg
:
handleTransactions
,
NewPooledTransactionHashesMsg
:
handleNewPooledTransactionHashes
,
NewPooledTransactionHashesMsg
:
handleNewPooledTransactionHashes
,
// eth66 messages with request-id
GetBlockHeadersMsg
:
handleGetBlockHeaders66
,
GetBlockHeadersMsg
:
handleGetBlockHeaders66
,
BlockHeadersMsg
:
handleBlockHeaders66
,
BlockHeadersMsg
:
handleBlockHeaders66
,
GetBlockBodiesMsg
:
handleGetBlockBodies66
,
GetBlockBodiesMsg
:
handleGetBlockBodies66
,
...
@@ -219,10 +201,11 @@ func handleMessage(backend Backend, peer *Peer) error {
...
@@ -219,10 +201,11 @@ func handleMessage(backend Backend, peer *Peer) error {
}
}
defer
msg
.
Discard
()
defer
msg
.
Discard
()
var
handlers
=
eth65
var
handlers
=
eth66
if
peer
.
Version
()
>=
ETH66
{
//if peer.Version() >= ETH67 { // Left in as a sample when new protocol is added
handlers
=
eth66
// handlers = eth67
}
//}
// Track the amount of time it takes to serve the request and run the handler
// Track the amount of time it takes to serve the request and run the handler
if
metrics
.
Enabled
{
if
metrics
.
Enabled
{
h
:=
fmt
.
Sprintf
(
"%s/%s/%d/%#02x"
,
p2p
.
HandleHistName
,
ProtocolName
,
peer
.
Version
(),
msg
.
Code
)
h
:=
fmt
.
Sprintf
(
"%s/%s/%d/%#02x"
,
p2p
.
HandleHistName
,
ProtocolName
,
peer
.
Version
(),
msg
.
Code
)
...
...
eth/protocols/eth/handler_test.go
View file @
0db0b277
...
@@ -110,7 +110,6 @@ func (b *testBackend) Handle(*Peer, Packet) error {
...
@@ -110,7 +110,6 @@ func (b *testBackend) Handle(*Peer, Packet) error {
}
}
// Tests that block headers can be retrieved from a remote chain based on user queries.
// Tests that block headers can be retrieved from a remote chain based on user queries.
func
TestGetBlockHeaders65
(
t
*
testing
.
T
)
{
testGetBlockHeaders
(
t
,
ETH65
)
}
func
TestGetBlockHeaders66
(
t
*
testing
.
T
)
{
testGetBlockHeaders
(
t
,
ETH66
)
}
func
TestGetBlockHeaders66
(
t
*
testing
.
T
)
{
testGetBlockHeaders
(
t
,
ETH66
)
}
func
testGetBlockHeaders
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testGetBlockHeaders
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -254,12 +253,6 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
...
@@ -254,12 +253,6 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
headers
=
append
(
headers
,
backend
.
chain
.
GetBlockByHash
(
hash
)
.
Header
())
headers
=
append
(
headers
,
backend
.
chain
.
GetBlockByHash
(
hash
)
.
Header
())
}
}
// Send the hash request and verify the response
// Send the hash request and verify the response
if
protocol
<=
ETH65
{
p2p
.
Send
(
peer
.
app
,
GetBlockHeadersMsg
,
tt
.
query
)
if
err
:=
p2p
.
ExpectMsg
(
peer
.
app
,
BlockHeadersMsg
,
headers
);
err
!=
nil
{
t
.
Errorf
(
"test %d: headers mismatch: %v"
,
i
,
err
)
}
}
else
{
p2p
.
Send
(
peer
.
app
,
GetBlockHeadersMsg
,
GetBlockHeadersPacket66
{
p2p
.
Send
(
peer
.
app
,
GetBlockHeadersMsg
,
GetBlockHeadersPacket66
{
RequestId
:
123
,
RequestId
:
123
,
GetBlockHeadersPacket
:
tt
.
query
,
GetBlockHeadersPacket
:
tt
.
query
,
...
@@ -270,18 +263,11 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
...
@@ -270,18 +263,11 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
});
err
!=
nil
{
});
err
!=
nil
{
t
.
Errorf
(
"test %d: headers mismatch: %v"
,
i
,
err
)
t
.
Errorf
(
"test %d: headers mismatch: %v"
,
i
,
err
)
}
}
}
// If the test used number origins, repeat with hashes as the too
// If the test used number origins, repeat with hashes as the too
if
tt
.
query
.
Origin
.
Hash
==
(
common
.
Hash
{})
{
if
tt
.
query
.
Origin
.
Hash
==
(
common
.
Hash
{})
{
if
origin
:=
backend
.
chain
.
GetBlockByNumber
(
tt
.
query
.
Origin
.
Number
);
origin
!=
nil
{
if
origin
:=
backend
.
chain
.
GetBlockByNumber
(
tt
.
query
.
Origin
.
Number
);
origin
!=
nil
{
tt
.
query
.
Origin
.
Hash
,
tt
.
query
.
Origin
.
Number
=
origin
.
Hash
(),
0
tt
.
query
.
Origin
.
Hash
,
tt
.
query
.
Origin
.
Number
=
origin
.
Hash
(),
0
if
protocol
<=
ETH65
{
p2p
.
Send
(
peer
.
app
,
GetBlockHeadersMsg
,
tt
.
query
)
if
err
:=
p2p
.
ExpectMsg
(
peer
.
app
,
BlockHeadersMsg
,
headers
);
err
!=
nil
{
t
.
Errorf
(
"test %d: headers mismatch: %v"
,
i
,
err
)
}
}
else
{
p2p
.
Send
(
peer
.
app
,
GetBlockHeadersMsg
,
GetBlockHeadersPacket66
{
p2p
.
Send
(
peer
.
app
,
GetBlockHeadersMsg
,
GetBlockHeadersPacket66
{
RequestId
:
456
,
RequestId
:
456
,
GetBlockHeadersPacket
:
tt
.
query
,
GetBlockHeadersPacket
:
tt
.
query
,
...
@@ -295,11 +281,9 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
...
@@ -295,11 +281,9 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
}
}
}
}
}
}
}
}
}
// Tests that block contents can be retrieved from a remote chain based on their hashes.
// Tests that block contents can be retrieved from a remote chain based on their hashes.
func
TestGetBlockBodies65
(
t
*
testing
.
T
)
{
testGetBlockBodies
(
t
,
ETH65
)
}
func
TestGetBlockBodies66
(
t
*
testing
.
T
)
{
testGetBlockBodies
(
t
,
ETH66
)
}
func
TestGetBlockBodies66
(
t
*
testing
.
T
)
{
testGetBlockBodies
(
t
,
ETH66
)
}
func
testGetBlockBodies
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testGetBlockBodies
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -369,12 +353,6 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
...
@@ -369,12 +353,6 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
}
}
}
}
// Send the hash request and verify the response
// Send the hash request and verify the response
if
protocol
<=
ETH65
{
p2p
.
Send
(
peer
.
app
,
GetBlockBodiesMsg
,
hashes
)
if
err
:=
p2p
.
ExpectMsg
(
peer
.
app
,
BlockBodiesMsg
,
bodies
);
err
!=
nil
{
t
.
Errorf
(
"test %d: bodies mismatch: %v"
,
i
,
err
)
}
}
else
{
p2p
.
Send
(
peer
.
app
,
GetBlockBodiesMsg
,
GetBlockBodiesPacket66
{
p2p
.
Send
(
peer
.
app
,
GetBlockBodiesMsg
,
GetBlockBodiesPacket66
{
RequestId
:
123
,
RequestId
:
123
,
GetBlockBodiesPacket
:
hashes
,
GetBlockBodiesPacket
:
hashes
,
...
@@ -386,11 +364,9 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
...
@@ -386,11 +364,9 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
t
.
Errorf
(
"test %d: bodies mismatch: %v"
,
i
,
err
)
t
.
Errorf
(
"test %d: bodies mismatch: %v"
,
i
,
err
)
}
}
}
}
}
}
}
// Tests that the state trie nodes can be retrieved based on hashes.
// Tests that the state trie nodes can be retrieved based on hashes.
func
TestGetNodeData65
(
t
*
testing
.
T
)
{
testGetNodeData
(
t
,
ETH65
)
}
func
TestGetNodeData66
(
t
*
testing
.
T
)
{
testGetNodeData
(
t
,
ETH66
)
}
func
TestGetNodeData66
(
t
*
testing
.
T
)
{
testGetNodeData
(
t
,
ETH66
)
}
func
testGetNodeData
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testGetNodeData
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -449,14 +425,10 @@ func testGetNodeData(t *testing.T, protocol uint) {
...
@@ -449,14 +425,10 @@ func testGetNodeData(t *testing.T, protocol uint) {
}
}
it
.
Release
()
it
.
Release
()
if
protocol
<=
ETH65
{
p2p
.
Send
(
peer
.
app
,
GetNodeDataMsg
,
hashes
)
}
else
{
p2p
.
Send
(
peer
.
app
,
GetNodeDataMsg
,
GetNodeDataPacket66
{
p2p
.
Send
(
peer
.
app
,
GetNodeDataMsg
,
GetNodeDataPacket66
{
RequestId
:
123
,
RequestId
:
123
,
GetNodeDataPacket
:
hashes
,
GetNodeDataPacket
:
hashes
,
})
})
}
msg
,
err
:=
peer
.
app
.
ReadMsg
()
msg
,
err
:=
peer
.
app
.
ReadMsg
()
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatalf
(
"failed to read node data response: %v"
,
err
)
t
.
Fatalf
(
"failed to read node data response: %v"
,
err
)
...
@@ -464,18 +436,14 @@ func testGetNodeData(t *testing.T, protocol uint) {
...
@@ -464,18 +436,14 @@ func testGetNodeData(t *testing.T, protocol uint) {
if
msg
.
Code
!=
NodeDataMsg
{
if
msg
.
Code
!=
NodeDataMsg
{
t
.
Fatalf
(
"response packet code mismatch: have %x, want %x"
,
msg
.
Code
,
NodeDataMsg
)
t
.
Fatalf
(
"response packet code mismatch: have %x, want %x"
,
msg
.
Code
,
NodeDataMsg
)
}
}
var
data
[][]
byte
var
(
if
protocol
<=
ETH65
{
data
[][]
byte
if
err
:=
msg
.
Decode
(
&
data
);
err
!=
nil
{
res
NodeDataPacket66
t
.
Fatalf
(
"failed to decode response node data: %v"
,
err
)
)
}
}
else
{
var
res
NodeDataPacket66
if
err
:=
msg
.
Decode
(
&
res
);
err
!=
nil
{
if
err
:=
msg
.
Decode
(
&
res
);
err
!=
nil
{
t
.
Fatalf
(
"failed to decode response node data: %v"
,
err
)
t
.
Fatalf
(
"failed to decode response node data: %v"
,
err
)
}
}
data
=
res
.
NodeDataPacket
data
=
res
.
NodeDataPacket
}
// Verify that all hashes correspond to the requested data, and reconstruct a state tree
// Verify that all hashes correspond to the requested data, and reconstruct a state tree
for
i
,
want
:=
range
hashes
{
for
i
,
want
:=
range
hashes
{
if
hash
:=
crypto
.
Keccak256Hash
(
data
[
i
]);
hash
!=
want
{
if
hash
:=
crypto
.
Keccak256Hash
(
data
[
i
]);
hash
!=
want
{
...
@@ -506,7 +474,6 @@ func testGetNodeData(t *testing.T, protocol uint) {
...
@@ -506,7 +474,6 @@ func testGetNodeData(t *testing.T, protocol uint) {
}
}
// Tests that the transaction receipts can be retrieved based on hashes.
// Tests that the transaction receipts can be retrieved based on hashes.
func
TestGetBlockReceipts65
(
t
*
testing
.
T
)
{
testGetBlockReceipts
(
t
,
ETH65
)
}
func
TestGetBlockReceipts66
(
t
*
testing
.
T
)
{
testGetBlockReceipts
(
t
,
ETH66
)
}
func
TestGetBlockReceipts66
(
t
*
testing
.
T
)
{
testGetBlockReceipts
(
t
,
ETH66
)
}
func
testGetBlockReceipts
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testGetBlockReceipts
(
t
*
testing
.
T
,
protocol
uint
)
{
...
@@ -566,12 +533,6 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
...
@@ -566,12 +533,6 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
receipts
=
append
(
receipts
,
backend
.
chain
.
GetReceiptsByHash
(
block
.
Hash
()))
receipts
=
append
(
receipts
,
backend
.
chain
.
GetReceiptsByHash
(
block
.
Hash
()))
}
}
// Send the hash request and verify the response
// Send the hash request and verify the response
if
protocol
<=
ETH65
{
p2p
.
Send
(
peer
.
app
,
GetReceiptsMsg
,
hashes
)
if
err
:=
p2p
.
ExpectMsg
(
peer
.
app
,
ReceiptsMsg
,
receipts
);
err
!=
nil
{
t
.
Errorf
(
"receipts mismatch: %v"
,
err
)
}
}
else
{
p2p
.
Send
(
peer
.
app
,
GetReceiptsMsg
,
GetReceiptsPacket66
{
p2p
.
Send
(
peer
.
app
,
GetReceiptsMsg
,
GetReceiptsPacket66
{
RequestId
:
123
,
RequestId
:
123
,
GetReceiptsPacket
:
hashes
,
GetReceiptsPacket
:
hashes
,
...
@@ -582,5 +543,4 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
...
@@ -582,5 +543,4 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
});
err
!=
nil
{
});
err
!=
nil
{
t
.
Errorf
(
"receipts mismatch: %v"
,
err
)
t
.
Errorf
(
"receipts mismatch: %v"
,
err
)
}
}
}
}
}
eth/protocols/eth/handlers.go
View file @
0db0b277
...
@@ -27,17 +27,6 @@ import (
...
@@ -27,17 +27,6 @@ import (
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie"
)
)
// handleGetBlockHeaders handles Block header query, collect the requested headers and reply
func
handleGetBlockHeaders
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the complex header query
var
query
GetBlockHeadersPacket
if
err
:=
msg
.
Decode
(
&
query
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
response
:=
answerGetBlockHeadersQuery
(
backend
,
&
query
,
peer
)
return
peer
.
SendBlockHeaders
(
response
)
}
// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
func
handleGetBlockHeaders66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleGetBlockHeaders66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the complex header query
// Decode the complex header query
...
@@ -135,16 +124,6 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p
...
@@ -135,16 +124,6 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p
return
headers
return
headers
}
}
func
handleGetBlockBodies
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the block body retrieval message
var
query
GetBlockBodiesPacket
if
err
:=
msg
.
Decode
(
&
query
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
response
:=
answerGetBlockBodiesQuery
(
backend
,
query
,
peer
)
return
peer
.
SendBlockBodiesRLP
(
response
)
}
func
handleGetBlockBodies66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleGetBlockBodies66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the block body retrieval message
// Decode the block body retrieval message
var
query
GetBlockBodiesPacket66
var
query
GetBlockBodiesPacket66
...
@@ -174,16 +153,6 @@ func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer
...
@@ -174,16 +153,6 @@ func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer
return
bodies
return
bodies
}
}
func
handleGetNodeData
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the trie node data retrieval message
var
query
GetNodeDataPacket
if
err
:=
msg
.
Decode
(
&
query
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
response
:=
answerGetNodeDataQuery
(
backend
,
query
,
peer
)
return
peer
.
SendNodeData
(
response
)
}
func
handleGetNodeData66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleGetNodeData66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the trie node data retrieval message
// Decode the trie node data retrieval message
var
query
GetNodeDataPacket66
var
query
GetNodeDataPacket66
...
@@ -223,16 +192,6 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer
...
@@ -223,16 +192,6 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer
return
nodes
return
nodes
}
}
func
handleGetReceipts
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the block receipts retrieval message
var
query
GetReceiptsPacket
if
err
:=
msg
.
Decode
(
&
query
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
response
:=
answerGetReceiptsQuery
(
backend
,
query
,
peer
)
return
peer
.
SendReceiptsRLP
(
response
)
}
func
handleGetReceipts66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleGetReceipts66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the block receipts retrieval message
// Decode the block receipts retrieval message
var
query
GetReceiptsPacket66
var
query
GetReceiptsPacket66
...
@@ -312,15 +271,6 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
...
@@ -312,15 +271,6 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
return
backend
.
Handle
(
peer
,
ann
)
return
backend
.
Handle
(
peer
,
ann
)
}
}
func
handleBlockHeaders
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of headers arrived to one of our previous requests
res
:=
new
(
BlockHeadersPacket
)
if
err
:=
msg
.
Decode
(
res
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
return
backend
.
Handle
(
peer
,
res
)
}
func
handleBlockHeaders66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleBlockHeaders66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of headers arrived to one of our previous requests
// A batch of headers arrived to one of our previous requests
res
:=
new
(
BlockHeadersPacket66
)
res
:=
new
(
BlockHeadersPacket66
)
...
@@ -332,15 +282,6 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
...
@@ -332,15 +282,6 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
return
backend
.
Handle
(
peer
,
&
res
.
BlockHeadersPacket
)
return
backend
.
Handle
(
peer
,
&
res
.
BlockHeadersPacket
)
}
}
func
handleBlockBodies
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of block bodies arrived to one of our previous requests
res
:=
new
(
BlockBodiesPacket
)
if
err
:=
msg
.
Decode
(
res
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
return
backend
.
Handle
(
peer
,
res
)
}
func
handleBlockBodies66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleBlockBodies66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of block bodies arrived to one of our previous requests
// A batch of block bodies arrived to one of our previous requests
res
:=
new
(
BlockBodiesPacket66
)
res
:=
new
(
BlockBodiesPacket66
)
...
@@ -352,15 +293,6 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
...
@@ -352,15 +293,6 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
return
backend
.
Handle
(
peer
,
&
res
.
BlockBodiesPacket
)
return
backend
.
Handle
(
peer
,
&
res
.
BlockBodiesPacket
)
}
}
func
handleNodeData
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of node state data arrived to one of our previous requests
res
:=
new
(
NodeDataPacket
)
if
err
:=
msg
.
Decode
(
res
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
return
backend
.
Handle
(
peer
,
res
)
}
func
handleNodeData66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleNodeData66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of node state data arrived to one of our previous requests
// A batch of node state data arrived to one of our previous requests
res
:=
new
(
NodeDataPacket66
)
res
:=
new
(
NodeDataPacket66
)
...
@@ -372,15 +304,6 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
...
@@ -372,15 +304,6 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
return
backend
.
Handle
(
peer
,
&
res
.
NodeDataPacket
)
return
backend
.
Handle
(
peer
,
&
res
.
NodeDataPacket
)
}
}
func
handleReceipts
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of receipts arrived to one of our previous requests
res
:=
new
(
ReceiptsPacket
)
if
err
:=
msg
.
Decode
(
res
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
return
backend
.
Handle
(
peer
,
res
)
}
func
handleReceipts66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleReceipts66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// A batch of receipts arrived to one of our previous requests
// A batch of receipts arrived to one of our previous requests
res
:=
new
(
ReceiptsPacket66
)
res
:=
new
(
ReceiptsPacket66
)
...
@@ -409,16 +332,6 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
...
@@ -409,16 +332,6 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
return
backend
.
Handle
(
peer
,
ann
)
return
backend
.
Handle
(
peer
,
ann
)
}
}
func
handleGetPooledTransactions
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the pooled transactions retrieval message
var
query
GetPooledTransactionsPacket
if
err
:=
msg
.
Decode
(
&
query
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
hashes
,
txs
:=
answerGetPooledTransactions
(
backend
,
query
,
peer
)
return
peer
.
SendPooledTransactionsRLP
(
hashes
,
txs
)
}
func
handleGetPooledTransactions66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handleGetPooledTransactions66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Decode the pooled transactions retrieval message
// Decode the pooled transactions retrieval message
var
query
GetPooledTransactionsPacket66
var
query
GetPooledTransactionsPacket66
...
@@ -477,26 +390,6 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
...
@@ -477,26 +390,6 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
return
backend
.
Handle
(
peer
,
&
txs
)
return
backend
.
Handle
(
peer
,
&
txs
)
}
}
func
handlePooledTransactions
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if
!
backend
.
AcceptTxs
()
{
return
nil
}
// Transactions can be processed, parse all of them and deliver to the pool
var
txs
PooledTransactionsPacket
if
err
:=
msg
.
Decode
(
&
txs
);
err
!=
nil
{
return
fmt
.
Errorf
(
"%w: message %v: %v"
,
errDecode
,
msg
,
err
)
}
for
i
,
tx
:=
range
txs
{
// Validate and mark the remote transaction
if
tx
==
nil
{
return
fmt
.
Errorf
(
"%w: transaction %d is nil"
,
errDecode
,
i
)
}
peer
.
markTransaction
(
tx
.
Hash
())
}
return
backend
.
Handle
(
peer
,
&
txs
)
}
func
handlePooledTransactions66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
func
handlePooledTransactions66
(
backend
Backend
,
msg
Decoder
,
peer
*
Peer
)
error
{
// Transactions arrived, make sure we have a valid and fresh chain to handle them
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if
!
backend
.
AcceptTxs
()
{
if
!
backend
.
AcceptTxs
()
{
...
...
eth/protocols/eth/handshake_test.go
View file @
0db0b277
...
@@ -27,7 +27,6 @@ import (
...
@@ -27,7 +27,6 @@ import (
)
)
// Tests that handshake failures are detected and reported correctly.
// Tests that handshake failures are detected and reported correctly.
func
TestHandshake65
(
t
*
testing
.
T
)
{
testHandshake
(
t
,
ETH65
)
}
func
TestHandshake66
(
t
*
testing
.
T
)
{
testHandshake
(
t
,
ETH66
)
}
func
TestHandshake66
(
t
*
testing
.
T
)
{
testHandshake
(
t
,
ETH66
)
}
func
testHandshake
(
t
*
testing
.
T
,
protocol
uint
)
{
func
testHandshake
(
t
*
testing
.
T
,
protocol
uint
)
{
...
...
eth/protocols/eth/peer.go
View file @
0db0b277
...
@@ -108,9 +108,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
...
@@ -108,9 +108,8 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
// Start up all the broadcasters
// Start up all the broadcasters
go
peer
.
broadcastBlocks
()
go
peer
.
broadcastBlocks
()
go
peer
.
broadcastTransactions
()
go
peer
.
broadcastTransactions
()
if
version
>=
ETH65
{
go
peer
.
announceTransactions
()
go
peer
.
announceTransactions
()
}
return
peer
return
peer
}
}
...
@@ -252,22 +251,6 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
...
@@ -252,22 +251,6 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
}
}
}
}
// SendPooledTransactionsRLP sends requested transactions to the peer and adds the
// hashes in its transaction hash set for future reference.
//
// Note, the method assumes the hashes are correct and correspond to the list of
// transactions being sent.
func
(
p
*
Peer
)
SendPooledTransactionsRLP
(
hashes
[]
common
.
Hash
,
txs
[]
rlp
.
RawValue
)
error
{
// Mark all the transactions as known, but ensure we don't overflow our limits
for
p
.
knownTxs
.
Cardinality
()
>
max
(
0
,
maxKnownTxs
-
len
(
hashes
))
{
p
.
knownTxs
.
Pop
()
}
for
_
,
hash
:=
range
hashes
{
p
.
knownTxs
.
Add
(
hash
)
}
return
p2p
.
Send
(
p
.
rw
,
PooledTransactionsMsg
,
txs
)
// Not packed into PooledTransactionsPacket to avoid RLP decoding
}
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
func
(
p
*
Peer
)
ReplyPooledTransactionsRLP
(
id
uint64
,
hashes
[]
common
.
Hash
,
txs
[]
rlp
.
RawValue
)
error
{
func
(
p
*
Peer
)
ReplyPooledTransactionsRLP
(
id
uint64
,
hashes
[]
common
.
Hash
,
txs
[]
rlp
.
RawValue
)
error
{
// Mark all the transactions as known, but ensure we don't overflow our limits
// Mark all the transactions as known, but ensure we don't overflow our limits
...
@@ -346,11 +329,6 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
...
@@ -346,11 +329,6 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
}
}
}
// SendBlockHeaders sends a batch of block headers to the remote peer.
func
(
p
*
Peer
)
SendBlockHeaders
(
headers
[]
*
types
.
Header
)
error
{
return
p2p
.
Send
(
p
.
rw
,
BlockHeadersMsg
,
BlockHeadersPacket
(
headers
))
}
// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
func
(
p
*
Peer
)
ReplyBlockHeaders
(
id
uint64
,
headers
[]
*
types
.
Header
)
error
{
func
(
p
*
Peer
)
ReplyBlockHeaders
(
id
uint64
,
headers
[]
*
types
.
Header
)
error
{
return
p2p
.
Send
(
p
.
rw
,
BlockHeadersMsg
,
BlockHeadersPacket66
{
return
p2p
.
Send
(
p
.
rw
,
BlockHeadersMsg
,
BlockHeadersPacket66
{
...
@@ -359,12 +337,6 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
...
@@ -359,12 +337,6 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error {
})
})
}
}
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
// an already RLP encoded format.
func
(
p
*
Peer
)
SendBlockBodiesRLP
(
bodies
[]
rlp
.
RawValue
)
error
{
return
p2p
.
Send
(
p
.
rw
,
BlockBodiesMsg
,
bodies
)
// Not packed into BlockBodiesPacket to avoid RLP decoding
}
// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
func
(
p
*
Peer
)
ReplyBlockBodiesRLP
(
id
uint64
,
bodies
[]
rlp
.
RawValue
)
error
{
func
(
p
*
Peer
)
ReplyBlockBodiesRLP
(
id
uint64
,
bodies
[]
rlp
.
RawValue
)
error
{
// Not packed into BlockBodiesPacket to avoid RLP decoding
// Not packed into BlockBodiesPacket to avoid RLP decoding
...
@@ -374,12 +346,6 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
...
@@ -374,12 +346,6 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
})
})
}
}
// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func
(
p
*
Peer
)
SendNodeData
(
data
[][]
byte
)
error
{
return
p2p
.
Send
(
p
.
rw
,
NodeDataMsg
,
NodeDataPacket
(
data
))
}
// ReplyNodeData is the eth/66 response to GetNodeData.
// ReplyNodeData is the eth/66 response to GetNodeData.
func
(
p
*
Peer
)
ReplyNodeData
(
id
uint64
,
data
[][]
byte
)
error
{
func
(
p
*
Peer
)
ReplyNodeData
(
id
uint64
,
data
[][]
byte
)
error
{
return
p2p
.
Send
(
p
.
rw
,
NodeDataMsg
,
NodeDataPacket66
{
return
p2p
.
Send
(
p
.
rw
,
NodeDataMsg
,
NodeDataPacket66
{
...
@@ -388,12 +354,6 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
...
@@ -388,12 +354,6 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
})
})
}
}
// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
func
(
p
*
Peer
)
SendReceiptsRLP
(
receipts
[]
rlp
.
RawValue
)
error
{
return
p2p
.
Send
(
p
.
rw
,
ReceiptsMsg
,
receipts
)
// Not packed into ReceiptsPacket to avoid RLP decoding
}
// ReplyReceiptsRLP is the eth/66 response to GetReceipts.
// ReplyReceiptsRLP is the eth/66 response to GetReceipts.
func
(
p
*
Peer
)
ReplyReceiptsRLP
(
id
uint64
,
receipts
[]
rlp
.
RawValue
)
error
{
func
(
p
*
Peer
)
ReplyReceiptsRLP
(
id
uint64
,
receipts
[]
rlp
.
RawValue
)
error
{
return
p2p
.
Send
(
p
.
rw
,
ReceiptsMsg
,
ReceiptsRLPPacket66
{
return
p2p
.
Send
(
p
.
rw
,
ReceiptsMsg
,
ReceiptsRLPPacket66
{
...
@@ -406,85 +366,60 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
...
@@ -406,85 +366,60 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
// single header. It is used solely by the fetcher.
// single header. It is used solely by the fetcher.
func
(
p
*
Peer
)
RequestOneHeader
(
hash
common
.
Hash
)
error
{
func
(
p
*
Peer
)
RequestOneHeader
(
hash
common
.
Hash
)
error
{
p
.
Log
()
.
Debug
(
"Fetching single header"
,
"hash"
,
hash
)
p
.
Log
()
.
Debug
(
"Fetching single header"
,
"hash"
,
hash
)
query
:=
GetBlockHeadersPacket
{
Origin
:
HashOrNumber
{
Hash
:
hash
},
Amount
:
uint64
(
1
),
Skip
:
uint64
(
0
),
Reverse
:
false
,
}
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockHeadersMsg
,
BlockHeadersMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockHeadersMsg
,
BlockHeadersMsg
,
id
)
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
GetBlockHeadersPacket66
{
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
GetBlockHeadersPacket66
{
RequestId
:
id
,
RequestId
:
id
,
GetBlockHeadersPacket
:
&
query
,
GetBlockHeadersPacket
:
&
GetBlockHeadersPacket
{
Origin
:
HashOrNumber
{
Hash
:
hash
},
Amount
:
uint64
(
1
),
Skip
:
uint64
(
0
),
Reverse
:
false
,
},
})
})
}
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
query
)
}
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
// specified header query, based on the hash of an origin block.
func
(
p
*
Peer
)
RequestHeadersByHash
(
origin
common
.
Hash
,
amount
int
,
skip
int
,
reverse
bool
)
error
{
func
(
p
*
Peer
)
RequestHeadersByHash
(
origin
common
.
Hash
,
amount
int
,
skip
int
,
reverse
bool
)
error
{
p
.
Log
()
.
Debug
(
"Fetching batch of headers"
,
"count"
,
amount
,
"fromhash"
,
origin
,
"skip"
,
skip
,
"reverse"
,
reverse
)
p
.
Log
()
.
Debug
(
"Fetching batch of headers"
,
"count"
,
amount
,
"fromhash"
,
origin
,
"skip"
,
skip
,
"reverse"
,
reverse
)
query
:=
GetBlockHeadersPacket
{
Origin
:
HashOrNumber
{
Hash
:
origin
},
Amount
:
uint64
(
amount
),
Skip
:
uint64
(
skip
),
Reverse
:
reverse
,
}
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockHeadersMsg
,
BlockHeadersMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockHeadersMsg
,
BlockHeadersMsg
,
id
)
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
GetBlockHeadersPacket66
{
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
GetBlockHeadersPacket66
{
RequestId
:
id
,
RequestId
:
id
,
GetBlockHeadersPacket
:
&
query
,
GetBlockHeadersPacket
:
&
GetBlockHeadersPacket
{
Origin
:
HashOrNumber
{
Hash
:
origin
},
Amount
:
uint64
(
amount
),
Skip
:
uint64
(
skip
),
Reverse
:
reverse
,
},
})
})
}
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
query
)
}
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
// specified header query, based on the number of an origin block.
func
(
p
*
Peer
)
RequestHeadersByNumber
(
origin
uint64
,
amount
int
,
skip
int
,
reverse
bool
)
error
{
func
(
p
*
Peer
)
RequestHeadersByNumber
(
origin
uint64
,
amount
int
,
skip
int
,
reverse
bool
)
error
{
p
.
Log
()
.
Debug
(
"Fetching batch of headers"
,
"count"
,
amount
,
"fromnum"
,
origin
,
"skip"
,
skip
,
"reverse"
,
reverse
)
p
.
Log
()
.
Debug
(
"Fetching batch of headers"
,
"count"
,
amount
,
"fromnum"
,
origin
,
"skip"
,
skip
,
"reverse"
,
reverse
)
query
:=
GetBlockHeadersPacket
{
Origin
:
HashOrNumber
{
Number
:
origin
},
Amount
:
uint64
(
amount
),
Skip
:
uint64
(
skip
),
Reverse
:
reverse
,
}
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockHeadersMsg
,
BlockHeadersMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockHeadersMsg
,
BlockHeadersMsg
,
id
)
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
GetBlockHeadersPacket66
{
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
GetBlockHeadersPacket66
{
RequestId
:
id
,
RequestId
:
id
,
GetBlockHeadersPacket
:
&
query
,
GetBlockHeadersPacket
:
&
GetBlockHeadersPacket
{
})
}
return
p2p
.
Send
(
p
.
rw
,
GetBlockHeadersMsg
,
&
query
)
}
// ExpectRequestHeadersByNumber is a testing method to mirror the recipient side
// of the RequestHeadersByNumber operation.
func
(
p
*
Peer
)
ExpectRequestHeadersByNumber
(
origin
uint64
,
amount
int
,
skip
int
,
reverse
bool
)
error
{
req
:=
&
GetBlockHeadersPacket
{
Origin
:
HashOrNumber
{
Number
:
origin
},
Origin
:
HashOrNumber
{
Number
:
origin
},
Amount
:
uint64
(
amount
),
Amount
:
uint64
(
amount
),
Skip
:
uint64
(
skip
),
Skip
:
uint64
(
skip
),
Reverse
:
reverse
,
Reverse
:
reverse
,
}
},
return
p2p
.
ExpectMsg
(
p
.
rw
,
GetBlockHeadersMsg
,
req
)
}
)
}
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified.
// specified.
func
(
p
*
Peer
)
RequestBodies
(
hashes
[]
common
.
Hash
)
error
{
func
(
p
*
Peer
)
RequestBodies
(
hashes
[]
common
.
Hash
)
error
{
p
.
Log
()
.
Debug
(
"Fetching batch of block bodies"
,
"count"
,
len
(
hashes
))
p
.
Log
()
.
Debug
(
"Fetching batch of block bodies"
,
"count"
,
len
(
hashes
))
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockBodiesMsg
,
BlockBodiesMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetBlockBodiesMsg
,
BlockBodiesMsg
,
id
)
...
@@ -492,15 +427,12 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
...
@@ -492,15 +427,12 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
RequestId
:
id
,
RequestId
:
id
,
GetBlockBodiesPacket
:
hashes
,
GetBlockBodiesPacket
:
hashes
,
})
})
}
return
p2p
.
Send
(
p
.
rw
,
GetBlockBodiesMsg
,
GetBlockBodiesPacket
(
hashes
))
}
}
// RequestNodeData fetches a batch of arbitrary data from a node's known state
// RequestNodeData fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
// data, corresponding to the specified hashes.
func
(
p
*
Peer
)
RequestNodeData
(
hashes
[]
common
.
Hash
)
error
{
func
(
p
*
Peer
)
RequestNodeData
(
hashes
[]
common
.
Hash
)
error
{
p
.
Log
()
.
Debug
(
"Fetching batch of state data"
,
"count"
,
len
(
hashes
))
p
.
Log
()
.
Debug
(
"Fetching batch of state data"
,
"count"
,
len
(
hashes
))
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetNodeDataMsg
,
NodeDataMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetNodeDataMsg
,
NodeDataMsg
,
id
)
...
@@ -508,14 +440,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error {
...
@@ -508,14 +440,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error {
RequestId
:
id
,
RequestId
:
id
,
GetNodeDataPacket
:
hashes
,
GetNodeDataPacket
:
hashes
,
})
})
}
return
p2p
.
Send
(
p
.
rw
,
GetNodeDataMsg
,
GetNodeDataPacket
(
hashes
))
}
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func
(
p
*
Peer
)
RequestReceipts
(
hashes
[]
common
.
Hash
)
error
{
func
(
p
*
Peer
)
RequestReceipts
(
hashes
[]
common
.
Hash
)
error
{
p
.
Log
()
.
Debug
(
"Fetching batch of receipts"
,
"count"
,
len
(
hashes
))
p
.
Log
()
.
Debug
(
"Fetching batch of receipts"
,
"count"
,
len
(
hashes
))
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetReceiptsMsg
,
ReceiptsMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetReceiptsMsg
,
ReceiptsMsg
,
id
)
...
@@ -523,14 +452,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
...
@@ -523,14 +452,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
RequestId
:
id
,
RequestId
:
id
,
GetReceiptsPacket
:
hashes
,
GetReceiptsPacket
:
hashes
,
})
})
}
return
p2p
.
Send
(
p
.
rw
,
GetReceiptsMsg
,
GetReceiptsPacket
(
hashes
))
}
}
// RequestTxs fetches a batch of transactions from a remote node.
// RequestTxs fetches a batch of transactions from a remote node.
func
(
p
*
Peer
)
RequestTxs
(
hashes
[]
common
.
Hash
)
error
{
func
(
p
*
Peer
)
RequestTxs
(
hashes
[]
common
.
Hash
)
error
{
p
.
Log
()
.
Debug
(
"Fetching batch of transactions"
,
"count"
,
len
(
hashes
))
p
.
Log
()
.
Debug
(
"Fetching batch of transactions"
,
"count"
,
len
(
hashes
))
if
p
.
Version
()
>=
ETH66
{
id
:=
rand
.
Uint64
()
id
:=
rand
.
Uint64
()
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetPooledTransactionsMsg
,
PooledTransactionsMsg
,
id
)
requestTracker
.
Track
(
p
.
id
,
p
.
version
,
GetPooledTransactionsMsg
,
PooledTransactionsMsg
,
id
)
...
@@ -538,6 +464,4 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
...
@@ -538,6 +464,4 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
RequestId
:
id
,
RequestId
:
id
,
GetPooledTransactionsPacket
:
hashes
,
GetPooledTransactionsPacket
:
hashes
,
})
})
}
return
p2p
.
Send
(
p
.
rw
,
GetPooledTransactionsMsg
,
GetPooledTransactionsPacket
(
hashes
))
}
}
eth/protocols/eth/protocol.go
View file @
0db0b277
...
@@ -30,7 +30,6 @@ import (
...
@@ -30,7 +30,6 @@ import (
// Constants to match up protocol versions and messages
// Constants to match up protocol versions and messages
const
(
const
(
ETH65
=
65
ETH66
=
66
ETH66
=
66
)
)
...
@@ -40,17 +39,16 @@ const ProtocolName = "eth"
...
@@ -40,17 +39,16 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
// is primary).
var
ProtocolVersions
=
[]
uint
{
ETH66
,
ETH65
}
var
ProtocolVersions
=
[]
uint
{
ETH66
}
// protocolLengths are the number of implemented message corresponding to
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
// different protocol versions.
var
protocolLengths
=
map
[
uint
]
uint64
{
ETH66
:
17
,
ETH65
:
17
}
var
protocolLengths
=
map
[
uint
]
uint64
{
ETH66
:
17
}
// maxMessageSize is the maximum cap on the size of a protocol message.
// maxMessageSize is the maximum cap on the size of a protocol message.
const
maxMessageSize
=
10
*
1024
*
1024
const
maxMessageSize
=
10
*
1024
*
1024
const
(
const
(
// Protocol messages in eth/64
StatusMsg
=
0x00
StatusMsg
=
0x00
NewBlockHashesMsg
=
0x01
NewBlockHashesMsg
=
0x01
TransactionsMsg
=
0x02
TransactionsMsg
=
0x02
...
@@ -63,8 +61,6 @@ const (
...
@@ -63,8 +61,6 @@ const (
NodeDataMsg
=
0x0e
NodeDataMsg
=
0x0e
GetReceiptsMsg
=
0x0f
GetReceiptsMsg
=
0x0f
ReceiptsMsg
=
0x10
ReceiptsMsg
=
0x10
// Protocol messages overloaded in eth/65
NewPooledTransactionHashesMsg
=
0x08
NewPooledTransactionHashesMsg
=
0x08
GetPooledTransactionsMsg
=
0x09
GetPooledTransactionsMsg
=
0x09
PooledTransactionsMsg
=
0x0a
PooledTransactionsMsg
=
0x0a
...
@@ -128,7 +124,7 @@ type GetBlockHeadersPacket struct {
...
@@ -128,7 +124,7 @@ type GetBlockHeadersPacket struct {
Reverse
bool
// Query direction (false = rising towards latest, true = falling towards genesis)
Reverse
bool
// Query direction (false = rising towards latest, true = falling towards genesis)
}
}
// GetBlockHeadersPacket represents a block header query over eth/66
// GetBlockHeadersPacket
66
represents a block header query over eth/66
type
GetBlockHeadersPacket66
struct
{
type
GetBlockHeadersPacket66
struct
{
RequestId
uint64
RequestId
uint64
*
GetBlockHeadersPacket
*
GetBlockHeadersPacket
...
...
eth/sync.go
View file @
0db0b277
...
@@ -18,7 +18,6 @@ package eth
...
@@ -18,7 +18,6 @@ package eth
import
(
import
(
"math/big"
"math/big"
"math/rand"
"sync/atomic"
"sync/atomic"
"time"
"time"
...
@@ -28,23 +27,13 @@ import (
...
@@ -28,23 +27,13 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)
)
const
(
const
(
forceSyncCycle
=
10
*
time
.
Second
// Time interval to force syncs, even if few peers are available
forceSyncCycle
=
10
*
time
.
Second
// Time interval to force syncs, even if few peers are available
defaultMinSyncPeers
=
5
// Amount of peers desired to start syncing
defaultMinSyncPeers
=
5
// Amount of peers desired to start syncing
// This is the target size for the packs of transactions sent by txsyncLoop64.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize
=
100
*
1024
)
)
type
txsync
struct
{
p
*
eth
.
Peer
txs
[]
*
types
.
Transaction
}
// syncTransactions starts sending all currently pending transactions to the given peer.
// syncTransactions starts sending all currently pending transactions to the given peer.
func
(
h
*
handler
)
syncTransactions
(
p
*
eth
.
Peer
)
{
func
(
h
*
handler
)
syncTransactions
(
p
*
eth
.
Peer
)
{
// Assemble the set of transaction to broadcast or announce to the remote
// Assemble the set of transaction to broadcast or announce to the remote
...
@@ -64,94 +53,11 @@ func (h *handler) syncTransactions(p *eth.Peer) {
...
@@ -64,94 +53,11 @@ func (h *handler) syncTransactions(p *eth.Peer) {
// The eth/65 protocol introduces proper transaction announcements, so instead
// The eth/65 protocol introduces proper transaction announcements, so instead
// of dripping transactions across multiple peers, just send the entire list as
// of dripping transactions across multiple peers, just send the entire list as
// an announcement and let the remote side decide what they need (likely nothing).
// an announcement and let the remote side decide what they need (likely nothing).
if
p
.
Version
()
>=
eth
.
ETH65
{
hashes
:=
make
([]
common
.
Hash
,
len
(
txs
))
hashes
:=
make
([]
common
.
Hash
,
len
(
txs
))
for
i
,
tx
:=
range
txs
{
for
i
,
tx
:=
range
txs
{
hashes
[
i
]
=
tx
.
Hash
()
hashes
[
i
]
=
tx
.
Hash
()
}
}
p
.
AsyncSendPooledTransactionHashes
(
hashes
)
p
.
AsyncSendPooledTransactionHashes
(
hashes
)
return
}
// Out of luck, peer is running legacy protocols, drop the txs over
select
{
case
h
.
txsyncCh
<-
&
txsync
{
p
:
p
,
txs
:
txs
}
:
case
<-
h
.
quitSync
:
}
}
// txsyncLoop64 takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func
(
h
*
handler
)
txsyncLoop64
()
{
defer
h
.
wg
.
Done
()
var
(
pending
=
make
(
map
[
enode
.
ID
]
*
txsync
)
sending
=
false
// whether a send is active
pack
=
new
(
txsync
)
// the pack that is being sent
done
=
make
(
chan
error
,
1
)
// result of the send
)
// send starts a sending a pack of transactions from the sync.
send
:=
func
(
s
*
txsync
)
{
if
s
.
p
.
Version
()
>=
eth
.
ETH65
{
panic
(
"initial transaction syncer running on eth/65+"
)
}
// Fill pack with transactions up to the target size.
size
:=
common
.
StorageSize
(
0
)
pack
.
p
=
s
.
p
pack
.
txs
=
pack
.
txs
[
:
0
]
for
i
:=
0
;
i
<
len
(
s
.
txs
)
&&
size
<
txsyncPackSize
;
i
++
{
pack
.
txs
=
append
(
pack
.
txs
,
s
.
txs
[
i
])
size
+=
s
.
txs
[
i
]
.
Size
()
}
// Remove the transactions that will be sent.
s
.
txs
=
s
.
txs
[
:
copy
(
s
.
txs
,
s
.
txs
[
len
(
pack
.
txs
)
:
])]
if
len
(
s
.
txs
)
==
0
{
delete
(
pending
,
s
.
p
.
Peer
.
ID
())
}
// Send the pack in the background.
s
.
p
.
Log
()
.
Trace
(
"Sending batch of transactions"
,
"count"
,
len
(
pack
.
txs
),
"bytes"
,
size
)
sending
=
true
go
func
()
{
done
<-
pack
.
p
.
SendTransactions
(
pack
.
txs
)
}()
}
// pick chooses the next pending sync.
pick
:=
func
()
*
txsync
{
if
len
(
pending
)
==
0
{
return
nil
}
n
:=
rand
.
Intn
(
len
(
pending
))
+
1
for
_
,
s
:=
range
pending
{
if
n
--
;
n
==
0
{
return
s
}
}
return
nil
}
for
{
select
{
case
s
:=
<-
h
.
txsyncCh
:
pending
[
s
.
p
.
Peer
.
ID
()]
=
s
if
!
sending
{
send
(
s
)
}
case
err
:=
<-
done
:
sending
=
false
// Stop tracking peers that cause send failures.
if
err
!=
nil
{
pack
.
p
.
Log
()
.
Debug
(
"Transaction send failed"
,
"err"
,
err
)
delete
(
pending
,
pack
.
p
.
Peer
.
ID
())
}
// Schedule the next send.
if
s
:=
pick
();
s
!=
nil
{
send
(
s
)
}
case
<-
h
.
quitSync
:
return
}
}
}
}
// chainSyncer coordinates blockchain sync components.
// chainSyncer coordinates blockchain sync components.
...
...
eth/sync_test.go
View file @
0db0b277
...
@@ -28,7 +28,6 @@ import (
...
@@ -28,7 +28,6 @@ import (
)
)
// Tests that fast sync is disabled after a successful sync cycle.
// Tests that fast sync is disabled after a successful sync cycle.
func
TestFastSyncDisabling65
(
t
*
testing
.
T
)
{
testFastSyncDisabling
(
t
,
eth
.
ETH65
)
}
func
TestFastSyncDisabling66
(
t
*
testing
.
T
)
{
testFastSyncDisabling
(
t
,
eth
.
ETH66
)
}
func
TestFastSyncDisabling66
(
t
*
testing
.
T
)
{
testFastSyncDisabling
(
t
,
eth
.
ETH66
)
}
// Tests that fast sync gets disabled as soon as a real block is successfully
// Tests that fast sync gets disabled as soon as a real block is successfully
...
...
les/client_handler.go
View file @
0db0b277
...
@@ -472,7 +472,7 @@ func (d *downloaderPeerNotify) registerPeer(p *serverPeer) {
...
@@ -472,7 +472,7 @@ func (d *downloaderPeerNotify) registerPeer(p *serverPeer) {
handler
:
h
,
handler
:
h
,
peer
:
p
,
peer
:
p
,
}
}
h
.
downloader
.
RegisterLightPeer
(
p
.
id
,
eth
.
ETH6
5
,
pc
)
h
.
downloader
.
RegisterLightPeer
(
p
.
id
,
eth
.
ETH6
6
,
pc
)
}
}
func
(
d
*
downloaderPeerNotify
)
unregisterPeer
(
p
*
serverPeer
)
{
func
(
d
*
downloaderPeerNotify
)
unregisterPeer
(
p
*
serverPeer
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment