Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
2c8ed76e
Commit
2c8ed76e
authored
Jun 26, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth: start cleaning up old protocol implementation, add metrics
parent
393d6756
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
105 additions
and
39 deletions
+105
-39
handler.go
eth/handler.go
+37
-17
metrics.go
eth/metrics.go
+26
-0
peer.go
eth/peer.go
+42
-22
No files found.
eth/handler.go
View file @
2c8ed76e
...
...
@@ -163,26 +163,28 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
return
newPeer
(
pv
,
nv
,
genesis
,
current
,
td
,
p
,
rw
)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
// this function terminates, the peer is disconnected.
func
(
pm
*
ProtocolManager
)
handle
(
p
*
peer
)
error
{
// Execute the Ethereum handshake.
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: peer connected"
,
p
)
// Execute the Ethereum handshake
if
err
:=
p
.
handleStatus
();
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: handshake failed: %v"
,
p
,
err
)
return
err
}
// Register the peer locally.
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Adding peer"
,
p
.
id
)
// Register the peer locally
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: adding peer"
,
p
)
if
err
:=
pm
.
peers
.
Register
(
p
);
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Info
ln
(
"Addition failed:"
,
err
)
glog
.
V
(
logger
.
Error
)
.
Info
f
(
"%v: addition failed: %v"
,
p
,
err
)
return
err
}
defer
pm
.
removePeer
(
p
.
id
)
// Register the peer in the downloader. If the downloader
// considers it banned, we disconnect.
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if
err
:=
pm
.
downloader
.
RegisterPeer
(
p
.
id
,
p
.
Head
(),
p
.
requestHashes
,
p
.
requestBlocks
);
err
!=
nil
{
return
err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm
.
syncTransactions
(
p
)
...
...
@@ -190,13 +192,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
// main loop. handle incoming messages.
for
{
if
err
:=
pm
.
handleMsg
(
p
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: message handling failed: %v"
,
p
,
err
)
return
err
}
}
return
nil
}
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func
(
pm
*
ProtocolManager
)
handleMsg
(
p
*
peer
)
error
{
// Read the next message from the remote peer, and ensure it's fully consumed
msg
,
err
:=
p
.
rw
.
ReadMsg
()
if
err
!=
nil
{
return
err
...
...
@@ -204,23 +210,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if
msg
.
Size
>
ProtocolMaxMsgSize
{
return
errResp
(
ErrMsgTooLarge
,
"%v > %v"
,
msg
.
Size
,
ProtocolMaxMsgSize
)
}
// make sure that the payload has been fully consumed
defer
msg
.
Discard
()
// Handle the message depending on its contents
switch
msg
.
Code
{
case
StatusMsg
:
return
errResp
(
ErrExtraStatusMsg
,
"uncontrolled status message"
)
case
TxMsg
:
// T
ODO: rework using lazy RLP stream
// T
ransactions arrived, parse all of them and deliver to the pool
var
txs
[]
*
types
.
Transaction
if
err
:=
msg
.
Decode
(
&
txs
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
}
propTxnInPacketsMeter
.
Mark
(
1
)
for
i
,
tx
:=
range
txs
{
if
tx
==
nil
{
return
errResp
(
ErrDecode
,
"transaction %d is nil"
,
i
)
}
propTxnInTrafficMeter
.
Mark
(
tx
.
Size
()
.
Int64
())
jsonlogger
.
LogJson
(
&
logger
.
EthTxReceived
{
TxHash
:
tx
.
Hash
()
.
Hex
(),
RemoteId
:
p
.
ID
()
.
String
(),
...
...
@@ -250,12 +258,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return
p
.
sendBlockHashes
(
hashes
)
case
BlockHashesMsg
:
// A batch of hashes arrived to one of our previous requests
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
,
uint64
(
msg
.
Size
))
reqHashInPacketsMeter
.
Mark
(
1
)
var
hashes
[]
common
.
Hash
if
err
:=
msgStream
.
Decode
(
&
hashes
);
err
!=
nil
{
break
}
reqHashInTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
// Deliver them all to the downloader for queuing
err
:=
pm
.
downloader
.
DeliverHashes
(
p
.
id
,
hashes
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
err
)
...
...
@@ -299,13 +312,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
list
=
list
[
:
len
(
list
)
-
2
]
+
"]"
glog
.
Infof
(
"
Peer %s: no blocks found for requested hashes %s"
,
p
.
id
,
list
)
glog
.
Infof
(
"
%v: no blocks found for requested hashes %s"
,
p
,
list
)
}
return
p
.
sendBlocks
(
blocks
)
case
BlocksMsg
:
// Decode the arrived block message
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
,
uint64
(
msg
.
Size
))
reqBlockInPacketsMeter
.
Mark
(
1
)
var
blocks
[]
*
types
.
Block
if
err
:=
msgStream
.
Decode
(
&
blocks
);
err
!=
nil
{
...
...
@@ -313,8 +327,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
blocks
=
nil
}
// Update the receive timestamp of each block
for
i
:=
0
;
i
<
len
(
blocks
);
i
++
{
blocks
[
i
]
.
ReceivedAt
=
msg
.
ReceivedAt
for
_
,
block
:=
range
blocks
{
reqBlockInTrafficMeter
.
Mark
(
block
.
Size
()
.
Int64
())
block
.
ReceivedAt
=
msg
.
ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if
blocks
:=
pm
.
fetcher
.
Filter
(
blocks
);
len
(
blocks
)
>
0
{
...
...
@@ -329,6 +344,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if
err
:=
msgStream
.
Decode
(
&
hashes
);
err
!=
nil
{
break
}
propHashInPacketsMeter
.
Mark
(
1
)
propHashInTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
// Mark the hashes as present at the remote node
for
_
,
hash
:=
range
hashes
{
p
.
blockHashes
.
Add
(
hash
)
...
...
@@ -351,6 +369,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if
err
:=
msg
.
Decode
(
&
request
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"%v: %v"
,
msg
,
err
)
}
propBlockInPacketsMeter
.
Mark
(
1
)
propBlockInTrafficMeter
.
Mark
(
request
.
Block
.
Size
()
.
Int64
())
if
err
:=
request
.
Block
.
ValidateFields
();
err
!=
nil
{
return
errResp
(
ErrDecode
,
"block validation %v: %v"
,
msg
,
err
)
}
...
...
@@ -404,15 +425,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
}
}
// BroadcastTx will propagate the block to its connected peers. It will sort
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
// BroadcastTx will propagate a transaction to all peers which are not known to
// already have the given transaction.
func
(
pm
*
ProtocolManager
)
BroadcastTx
(
hash
common
.
Hash
,
tx
*
types
.
Transaction
)
{
// Broadcast transaction to a batch of peers not knowing about it
peers
:=
pm
.
peers
.
PeersWithoutTx
(
hash
)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for
_
,
peer
:=
range
peers
{
peer
.
sendTransaction
(
tx
)
peer
.
sendTransaction
s
(
types
.
Transactions
{
tx
}
)
}
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"broadcast tx to"
,
len
(
peers
),
"peers"
)
}
...
...
eth/metrics.go
0 → 100644
View file @
2c8ed76e
package
eth
import
"github.com/rcrowley/go-metrics"
var
(
propTxnInPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/txns/in/packets"
,
metrics
.
DefaultRegistry
)
propTxnInTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/txns/in/traffic"
,
metrics
.
DefaultRegistry
)
propTxnOutPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/txns/out/packets"
,
metrics
.
DefaultRegistry
)
propTxnOutTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/txns/out/traffic"
,
metrics
.
DefaultRegistry
)
propHashInPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/hashes/in/packets"
,
metrics
.
DefaultRegistry
)
propHashInTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/hashes/in/traffic"
,
metrics
.
DefaultRegistry
)
propHashOutPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/hashes/out/packets"
,
metrics
.
DefaultRegistry
)
propHashOutTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/hashes/out/traffic"
,
metrics
.
DefaultRegistry
)
propBlockInPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/blocks/in/packets"
,
metrics
.
DefaultRegistry
)
propBlockInTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/blocks/in/traffic"
,
metrics
.
DefaultRegistry
)
propBlockOutPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/blocks/out/packets"
,
metrics
.
DefaultRegistry
)
propBlockOutTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/prop/blocks/out/traffic"
,
metrics
.
DefaultRegistry
)
reqHashInPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/hashes/in/packets"
,
metrics
.
DefaultRegistry
)
reqHashInTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/hashes/in/traffic"
,
metrics
.
DefaultRegistry
)
reqHashOutPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/hashes/out/packets"
,
metrics
.
DefaultRegistry
)
reqHashOutTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/hashes/out/traffic"
,
metrics
.
DefaultRegistry
)
reqBlockInPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/blocks/in/packets"
,
metrics
.
DefaultRegistry
)
reqBlockInTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/blocks/in/traffic"
,
metrics
.
DefaultRegistry
)
reqBlockOutPacketsMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/blocks/out/packets"
,
metrics
.
DefaultRegistry
)
reqBlockOutTrafficMeter
=
metrics
.
GetOrRegisterMeter
(
"eth/req/blocks/out/traffic"
,
metrics
.
DefaultRegistry
)
)
eth/peer.go
View file @
2c8ed76e
...
...
@@ -38,7 +38,8 @@ type peer struct {
rw
p2p
.
MsgReadWriter
protv
,
netid
int
version
int
// Protocol version negotiated
network
int
// Network ID being on
id
string
...
...
@@ -53,7 +54,7 @@ type peer struct {
blockHashes
*
set
.
Set
}
func
newPeer
(
protv
,
netid
int
,
genesis
,
head
common
.
Hash
,
td
*
big
.
Int
,
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
*
peer
{
func
newPeer
(
version
,
network
int
,
genesis
,
head
common
.
Hash
,
td
*
big
.
Int
,
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
*
peer
{
id
:=
p
.
ID
()
return
&
peer
{
...
...
@@ -62,8 +63,8 @@ func newPeer(protv, netid int, genesis, head common.Hash, td *big.Int, p *p2p.Pe
genesis
:
genesis
,
ourHash
:
head
,
ourTd
:
td
,
protv
:
protv
,
net
id
:
netid
,
version
:
version
,
net
work
:
network
,
id
:
fmt
.
Sprintf
(
"%x"
,
id
[
:
8
]),
txHashes
:
set
.
New
(),
blockHashes
:
set
.
New
(),
...
...
@@ -104,46 +105,58 @@ func (p *peer) SetTd(td *big.Int) {
}
// sendTransactions sends transactions to the peer and includes the hashes
// in it's tx hash set for future reference. The tx hash will allow the
// manager to check whether the peer has already received this particular
// transaction
// in its transaction hash set for future reference.
func
(
p
*
peer
)
sendTransactions
(
txs
types
.
Transactions
)
error
{
propTxnOutPacketsMeter
.
Mark
(
1
)
for
_
,
tx
:=
range
txs
{
propTxnOutTrafficMeter
.
Mark
(
tx
.
Size
()
.
Int64
())
p
.
txHashes
.
Add
(
tx
.
Hash
())
}
return
p2p
.
Send
(
p
.
rw
,
TxMsg
,
txs
)
}
// sendBlockHashes sends a batch of known hashes to the remote peer.
func
(
p
*
peer
)
sendBlockHashes
(
hashes
[]
common
.
Hash
)
error
{
reqHashOutPacketsMeter
.
Mark
(
1
)
reqHashOutTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
return
p2p
.
Send
(
p
.
rw
,
BlockHashesMsg
,
hashes
)
}
// sendBlocks sends a batch of blocks to the remote peer.
func
(
p
*
peer
)
sendBlocks
(
blocks
[]
*
types
.
Block
)
error
{
reqBlockOutPacketsMeter
.
Mark
(
1
)
for
_
,
block
:=
range
blocks
{
reqBlockOutTrafficMeter
.
Mark
(
block
.
Size
()
.
Int64
())
}
return
p2p
.
Send
(
p
.
rw
,
BlocksMsg
,
blocks
)
}
// sendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func
(
p
*
peer
)
sendNewBlockHashes
(
hashes
[]
common
.
Hash
)
error
{
propHashOutPacketsMeter
.
Mark
(
1
)
propHashOutTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
for
_
,
hash
:=
range
hashes
{
p
.
blockHashes
.
Add
(
hash
)
}
return
p2p
.
Send
(
p
.
rw
,
NewBlockHashesMsg
,
hashes
)
}
// sendNewBlock propagates an entire block to a remote peer.
func
(
p
*
peer
)
sendNewBlock
(
block
*
types
.
Block
)
error
{
p
.
blockHashes
.
Add
(
block
.
Hash
())
propBlockOutPacketsMeter
.
Mark
(
1
)
propBlockOutTrafficMeter
.
Mark
(
block
.
Size
()
.
Int64
())
p
.
blockHashes
.
Add
(
block
.
Hash
())
return
p2p
.
Send
(
p
.
rw
,
NewBlockMsg
,
[]
interface
{}{
block
,
block
.
Td
})
}
func
(
p
*
peer
)
sendTransaction
(
tx
*
types
.
Transaction
)
error
{
p
.
txHashes
.
Add
(
tx
.
Hash
())
return
p2p
.
Send
(
p
.
rw
,
TxMsg
,
[]
*
types
.
Transaction
{
tx
})
}
// requestHashes fetches a batch of hashes from a peer, starting at from, going
// towards the genesis block.
func
(
p
*
peer
)
requestHashes
(
from
common
.
Hash
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"[%s] fetching hashes (%d) %x...
\n
"
,
p
.
id
,
downloader
.
MaxHashFetch
,
from
[
:
4
])
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"
Peer
[%s] fetching hashes (%d) %x...
\n
"
,
p
.
id
,
downloader
.
MaxHashFetch
,
from
[
:
4
])
return
p2p
.
Send
(
p
.
rw
,
GetBlockHashesMsg
,
getBlockHashesMsgData
{
from
,
uint64
(
downloader
.
MaxHashFetch
)})
}
...
...
@@ -156,8 +169,8 @@ func (p *peer) handleStatus() error {
errc
:=
make
(
chan
error
,
1
)
go
func
()
{
errc
<-
p2p
.
Send
(
p
.
rw
,
StatusMsg
,
&
statusMsgData
{
ProtocolVersion
:
uint32
(
p
.
protv
),
NetworkId
:
uint32
(
p
.
net
id
),
ProtocolVersion
:
uint32
(
p
.
version
),
NetworkId
:
uint32
(
p
.
net
work
),
TD
:
p
.
ourTd
,
CurrentBlock
:
p
.
ourHash
,
GenesisBlock
:
p
.
genesis
,
...
...
@@ -185,12 +198,12 @@ func (p *peer) handleStatus() error {
return
errResp
(
ErrGenesisBlockMismatch
,
"%x (!= %x)"
,
status
.
GenesisBlock
,
p
.
genesis
)
}
if
int
(
status
.
NetworkId
)
!=
p
.
net
id
{
return
errResp
(
ErrNetworkIdMismatch
,
"%d (!= %d)"
,
status
.
NetworkId
,
p
.
net
id
)
if
int
(
status
.
NetworkId
)
!=
p
.
net
work
{
return
errResp
(
ErrNetworkIdMismatch
,
"%d (!= %d)"
,
status
.
NetworkId
,
p
.
net
work
)
}
if
int
(
status
.
ProtocolVersion
)
!=
p
.
protv
{
return
errResp
(
ErrProtocolVersionMismatch
,
"%d (!= %d)"
,
status
.
ProtocolVersion
,
p
.
protv
)
if
int
(
status
.
ProtocolVersion
)
!=
p
.
version
{
return
errResp
(
ErrProtocolVersionMismatch
,
"%d (!= %d)"
,
status
.
ProtocolVersion
,
p
.
version
)
}
// Set the total difficulty of the peer
p
.
td
=
status
.
TD
...
...
@@ -200,6 +213,13 @@ func (p *peer) handleStatus() error {
return
<-
errc
}
// String implements fmt.Stringer.
func
(
p
*
peer
)
String
()
string
{
return
fmt
.
Sprintf
(
"Peer %s [%s]"
,
p
.
id
,
fmt
.
Sprintf
(
"eth/%2d"
,
p
.
version
),
)
}
// peerSet represents the collection of active peers currently participating in
// the Ethereum sub-protocol.
type
peerSet
struct
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment