Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
bb7dca27
Commit
bb7dca27
authored
Mar 24, 2017
by
Péter Szilágyi
Committed by
Felix Lange
Mar 24, 2017
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
ethstats: cleanups, trace logs and "fix" history responses (#3812)
parent
8771c306
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
85 additions
and
69 deletions
+85
-69
ethstats.go
ethstats/ethstats.go
+85
-69
No files found.
ethstats/ethstats.go
View file @
bb7dca27
...
...
@@ -152,39 +152,31 @@ func (s *Service) loop() {
continue
}
// Authenticate the client with the server
in
:=
json
.
NewDecoder
(
conn
)
out
:=
json
.
NewEncoder
(
conn
)
if
err
=
s
.
login
(
in
,
out
);
err
!=
nil
{
if
err
=
s
.
login
(
conn
);
err
!=
nil
{
log
.
Warn
(
"Stats login failed"
,
"err"
,
err
)
conn
.
Close
()
time
.
Sleep
(
10
*
time
.
Second
)
continue
}
go
s
.
readLoop
(
conn
,
in
)
go
s
.
readLoop
(
conn
)
// Send the initial stats so our node looks decent from the get go
if
err
=
s
.
report
(
out
);
err
!=
nil
{
if
err
=
s
.
report
(
conn
);
err
!=
nil
{
log
.
Warn
(
"Initial stats report failed"
,
"err"
,
err
)
conn
.
Close
()
continue
}
if
err
=
s
.
reportHistory
(
out
,
nil
);
err
!=
nil
{
log
.
Warn
(
"Initial history report failed"
,
"err"
,
err
)
conn
.
Close
()
continue
}
// Keep sending status updates until the connection breaks
fullReport
:=
time
.
NewTicker
(
15
*
time
.
Second
)
for
err
==
nil
{
select
{
case
<-
fullReport
.
C
:
if
err
=
s
.
report
(
out
);
err
!=
nil
{
if
err
=
s
.
report
(
conn
);
err
!=
nil
{
log
.
Warn
(
"Full stats report failed"
,
"err"
,
err
)
}
case
list
:=
<-
s
.
histCh
:
if
err
=
s
.
reportHistory
(
out
,
list
);
err
!=
nil
{
if
err
=
s
.
reportHistory
(
conn
,
list
);
err
!=
nil
{
log
.
Warn
(
"Requested history report failed"
,
"err"
,
err
)
}
case
head
,
ok
:=
<-
headSub
.
Chan
()
:
...
...
@@ -192,10 +184,10 @@ func (s *Service) loop() {
conn
.
Close
()
return
}
if
err
=
s
.
reportBlock
(
out
,
head
.
Data
.
(
core
.
ChainHeadEvent
)
.
Block
);
err
!=
nil
{
if
err
=
s
.
reportBlock
(
conn
,
head
.
Data
.
(
core
.
ChainHeadEvent
)
.
Block
);
err
!=
nil
{
log
.
Warn
(
"Block stats report failed"
,
"err"
,
err
)
}
if
err
=
s
.
reportPending
(
out
);
err
!=
nil
{
if
err
=
s
.
reportPending
(
conn
);
err
!=
nil
{
log
.
Warn
(
"Post-block transaction stats report failed"
,
"err"
,
err
)
}
case
_
,
ok
:=
<-
txSub
.
Chan
()
:
...
...
@@ -211,7 +203,7 @@ func (s *Service) loop() {
exhausted
=
true
}
}
if
err
=
s
.
reportPending
(
out
);
err
!=
nil
{
if
err
=
s
.
reportPending
(
conn
);
err
!=
nil
{
log
.
Warn
(
"Transaction stats report failed"
,
"err"
,
err
)
}
}
...
...
@@ -225,17 +217,18 @@ func (s *Service) loop() {
// from the network socket. If any of them match an active request, it forwards
// it, if they themselves are requests it initiates a reply, and lastly it drops
// unknown packets.
func
(
s
*
Service
)
readLoop
(
conn
*
websocket
.
Conn
,
in
*
json
.
Decoder
)
{
func
(
s
*
Service
)
readLoop
(
conn
*
websocket
.
Conn
)
{
// If the read loop exists, close the connection
defer
conn
.
Close
()
for
{
// Retrieve the next generic network packet and bail out on error
var
msg
map
[
string
][]
interface
{}
if
err
:=
in
.
Decode
(
&
msg
);
err
!=
nil
{
if
err
:=
websocket
.
JSON
.
Receive
(
conn
,
&
msg
);
err
!=
nil
{
log
.
Warn
(
"Failed to decode stats server message"
,
"err"
,
err
)
return
}
log
.
Trace
(
"Received message from stats server"
,
"msg"
,
msg
)
if
len
(
msg
[
"emit"
])
==
0
{
log
.
Warn
(
"Stats server sent non-broadcast"
,
"msg"
,
msg
)
return
...
...
@@ -263,6 +256,7 @@ func (s *Service) readLoop(conn *websocket.Conn, in *json.Decoder) {
request
,
ok
:=
msg
[
"emit"
][
1
]
.
(
map
[
string
]
interface
{})
if
!
ok
{
log
.
Warn
(
"Invalid stats history request"
,
"msg"
,
msg
[
"emit"
][
1
])
s
.
histCh
<-
nil
continue
// Ethstats sometime sends invalid history requests, ignore those
}
list
,
ok
:=
request
[
"list"
]
.
([]
interface
{})
...
...
@@ -314,7 +308,7 @@ type authMsg struct {
}
// login tries to authorize the client at the remote server.
func
(
s
*
Service
)
login
(
in
*
json
.
Decoder
,
out
*
json
.
Encoder
)
error
{
func
(
s
*
Service
)
login
(
conn
*
websocket
.
Conn
)
error
{
// Construct and send the login authentication
infos
:=
s
.
server
.
NodeInfo
()
...
...
@@ -345,12 +339,12 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
login
:=
map
[
string
][]
interface
{}{
"emit"
:
{
"hello"
,
auth
},
}
if
err
:=
out
.
Encode
(
login
);
err
!=
nil
{
if
err
:=
websocket
.
JSON
.
Send
(
conn
,
login
);
err
!=
nil
{
return
err
}
// Retrieve the remote ack or connection termination
var
ack
map
[
string
][]
string
if
err
:=
in
.
Decode
(
&
ack
);
err
!=
nil
||
len
(
ack
[
"emit"
])
!=
1
||
ack
[
"emit"
][
0
]
!=
"ready"
{
if
err
:=
websocket
.
JSON
.
Receive
(
conn
,
&
ack
);
err
!=
nil
||
len
(
ack
[
"emit"
])
!=
1
||
ack
[
"emit"
][
0
]
!=
"ready"
{
return
errors
.
New
(
"unauthorized"
)
}
return
nil
...
...
@@ -359,17 +353,17 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
// report collects all possible data to report and send it to the stats server.
// This should only be used on reconnects or rarely to avoid overloading the
// server. Use the individual methods for reporting subscribed events.
func
(
s
*
Service
)
report
(
out
*
json
.
Encoder
)
error
{
if
err
:=
s
.
reportLatency
(
out
);
err
!=
nil
{
func
(
s
*
Service
)
report
(
conn
*
websocket
.
Conn
)
error
{
if
err
:=
s
.
reportLatency
(
conn
);
err
!=
nil
{
return
err
}
if
err
:=
s
.
reportBlock
(
out
,
nil
);
err
!=
nil
{
if
err
:=
s
.
reportBlock
(
conn
,
nil
);
err
!=
nil
{
return
err
}
if
err
:=
s
.
reportPending
(
out
);
err
!=
nil
{
if
err
:=
s
.
reportPending
(
conn
);
err
!=
nil
{
return
err
}
if
err
:=
s
.
reportStats
(
out
);
err
!=
nil
{
if
err
:=
s
.
reportStats
(
conn
);
err
!=
nil
{
return
err
}
return
nil
...
...
@@ -377,7 +371,7 @@ func (s *Service) report(out *json.Encoder) error {
// reportLatency sends a ping request to the server, measures the RTT time and
// finally sends a latency update.
func
(
s
*
Service
)
reportLatency
(
out
*
json
.
Encoder
)
error
{
func
(
s
*
Service
)
reportLatency
(
conn
*
websocket
.
Conn
)
error
{
// Send the current time to the ethstats server
start
:=
time
.
Now
()
...
...
@@ -387,7 +381,7 @@ func (s *Service) reportLatency(out *json.Encoder) error {
"clientTime"
:
start
.
String
(),
}},
}
if
err
:=
out
.
Encode
(
ping
);
err
!=
nil
{
if
err
:=
websocket
.
JSON
.
Send
(
conn
,
ping
);
err
!=
nil
{
return
err
}
// Wait for the pong request to arrive back
...
...
@@ -398,28 +392,35 @@ func (s *Service) reportLatency(out *json.Encoder) error {
// Ping timeout, abort
return
errors
.
New
(
"ping timed out"
)
}
latency
:=
strconv
.
Itoa
(
int
((
time
.
Since
(
start
)
/
time
.
Duration
(
2
))
.
Nanoseconds
()
/
1000000
))
// Send back the measured latency
latency
:=
map
[
string
][]
interface
{}{
log
.
Trace
(
"Sending measured latency to ethstats"
,
"latency"
,
latency
)
stats
:=
map
[
string
][]
interface
{}{
"emit"
:
{
"latency"
,
map
[
string
]
string
{
"id"
:
s
.
node
,
"latency"
:
strconv
.
Itoa
(
int
((
time
.
Since
(
start
)
/
time
.
Duration
(
2
))
.
Nanoseconds
()
/
1000000
))
,
"latency"
:
latency
,
}},
}
return
out
.
Encode
(
latency
)
return
websocket
.
JSON
.
Send
(
conn
,
stats
)
}
// blockStats is the information to report about individual blocks.
type
blockStats
struct
{
Number
*
big
.
Int
`json:"number"`
Hash
common
.
Hash
`json:"hash"`
Timestamp
*
big
.
Int
`json:"timestamp"`
Miner
common
.
Address
`json:"miner"`
GasUsed
*
big
.
Int
`json:"gasUsed"`
GasLimit
*
big
.
Int
`json:"gasLimit"`
Diff
string
`json:"difficulty"`
TotalDiff
string
`json:"totalDifficulty"`
Txs
txStats
`json:"transactions"`
Uncles
uncleStats
`json:"uncles"`
Number
*
big
.
Int
`json:"number"`
Hash
common
.
Hash
`json:"hash"`
ParentHash
common
.
Hash
`json:"parentHash"`
Timestamp
*
big
.
Int
`json:"timestamp"`
Miner
common
.
Address
`json:"miner"`
GasUsed
*
big
.
Int
`json:"gasUsed"`
GasLimit
*
big
.
Int
`json:"gasLimit"`
Diff
string
`json:"difficulty"`
TotalDiff
string
`json:"totalDifficulty"`
Txs
txStats
`json:"transactions"`
TxHash
common
.
Hash
`json:"transactionsRoot"`
Root
common
.
Hash
`json:"stateRoot"`
Uncles
uncleStats
`json:"uncles"`
}
// txStats is a custom wrapper around a transaction array to force serializing
...
...
@@ -445,16 +446,21 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
}
// reportBlock retrieves the current chain head and repors it to the stats server.
func
(
s
*
Service
)
reportBlock
(
out
*
json
.
Encoder
,
block
*
types
.
Block
)
error
{
// Assemble the block stats report and send it to the server
func
(
s
*
Service
)
reportBlock
(
conn
*
websocket
.
Conn
,
block
*
types
.
Block
)
error
{
// Gather the block details from the header or block chain
details
:=
s
.
assembleBlockStats
(
block
)
// Assemble the block report and send it to the server
log
.
Trace
(
"Sending new block to ethstats"
,
"number"
,
details
.
Number
,
"hash"
,
details
.
Hash
)
stats
:=
map
[
string
]
interface
{}{
"id"
:
s
.
node
,
"block"
:
s
.
assembleBlockStats
(
block
)
,
"block"
:
details
,
}
report
:=
map
[
string
][]
interface
{}{
"emit"
:
{
"block"
,
stats
},
}
return
out
.
Encode
(
report
)
return
websocket
.
JSON
.
Send
(
conn
,
report
)
}
// assembleBlockStats retrieves any required metadata to report a single block
...
...
@@ -488,22 +494,25 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
}
// Assemble and return the block stats
return
&
blockStats
{
Number
:
header
.
Number
,
Hash
:
header
.
Hash
(),
Timestamp
:
header
.
Time
,
Miner
:
header
.
Coinbase
,
GasUsed
:
new
(
big
.
Int
)
.
Set
(
header
.
GasUsed
),
GasLimit
:
new
(
big
.
Int
)
.
Set
(
header
.
GasLimit
),
Diff
:
header
.
Difficulty
.
String
(),
TotalDiff
:
td
.
String
(),
Txs
:
txs
,
Uncles
:
uncles
,
Number
:
header
.
Number
,
Hash
:
header
.
Hash
(),
ParentHash
:
header
.
ParentHash
,
Timestamp
:
header
.
Time
,
Miner
:
header
.
Coinbase
,
GasUsed
:
new
(
big
.
Int
)
.
Set
(
header
.
GasUsed
),
GasLimit
:
new
(
big
.
Int
)
.
Set
(
header
.
GasLimit
),
Diff
:
header
.
Difficulty
.
String
(),
TotalDiff
:
td
.
String
(),
Txs
:
txs
,
TxHash
:
header
.
TxHash
,
Root
:
header
.
Root
,
Uncles
:
uncles
,
}
}
// reportHistory retrieves the most recent batch of blocks and reports it to the
// stats server.
func
(
s
*
Service
)
reportHistory
(
out
*
json
.
Encoder
,
list
[]
uint64
)
error
{
func
(
s
*
Service
)
reportHistory
(
conn
*
websocket
.
Conn
,
list
[]
uint64
)
error
{
// Figure out the indexes that need reporting
indexes
:=
make
([]
uint64
,
0
,
historyUpdateRange
)
if
len
(
list
)
>
0
{
...
...
@@ -511,17 +520,17 @@ func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
indexes
=
append
(
indexes
,
list
...
)
}
else
{
// No indexes requested, send back the top ones
var
head
*
types
.
Header
var
head
int64
if
s
.
eth
!=
nil
{
head
=
s
.
eth
.
BlockChain
()
.
CurrentHeader
()
head
=
s
.
eth
.
BlockChain
()
.
CurrentHeader
()
.
Number
.
Int64
()
}
else
{
head
=
s
.
les
.
BlockChain
()
.
CurrentHeader
()
head
=
s
.
les
.
BlockChain
()
.
CurrentHeader
()
.
Number
.
Int64
()
}
start
:=
head
.
Number
.
Int64
()
-
historyUpdateRange
start
:=
head
-
historyUpdateRange
+
1
if
start
<
0
{
start
=
0
}
for
i
:=
uint64
(
start
);
i
<=
head
.
Number
.
Uint64
(
);
i
++
{
for
i
:=
uint64
(
start
);
i
<=
uint64
(
head
);
i
++
{
indexes
=
append
(
indexes
,
i
)
}
}
...
...
@@ -529,12 +538,14 @@ func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
history
:=
make
([]
*
blockStats
,
len
(
indexes
))
for
i
,
number
:=
range
indexes
{
if
s
.
eth
!=
nil
{
history
[
i
]
=
s
.
assembleBlockStats
(
s
.
eth
.
BlockChain
()
.
GetBlockByNumber
(
number
))
history
[
len
(
history
)
-
1
-
i
]
=
s
.
assembleBlockStats
(
s
.
eth
.
BlockChain
()
.
GetBlockByNumber
(
number
))
}
else
{
history
[
i
]
=
s
.
assembleBlockStats
(
types
.
NewBlockWithHeader
(
s
.
les
.
BlockChain
()
.
GetHeaderByNumber
(
number
)))
history
[
len
(
history
)
-
1
-
i
]
=
s
.
assembleBlockStats
(
types
.
NewBlockWithHeader
(
s
.
les
.
BlockChain
()
.
GetHeaderByNumber
(
number
)))
}
}
// Assemble the history report and send it to the server
log
.
Trace
(
"Sending historical blocks to ethstats"
,
"first"
,
history
[
0
]
.
Number
,
"last"
,
history
[
len
(
history
)
-
1
]
.
Number
)
stats
:=
map
[
string
]
interface
{}{
"id"
:
s
.
node
,
"history"
:
history
,
...
...
@@ -542,7 +553,7 @@ func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
report
:=
map
[
string
][]
interface
{}{
"emit"
:
{
"history"
,
stats
},
}
return
out
.
Encode
(
report
)
return
websocket
.
JSON
.
Send
(
conn
,
report
)
}
// pendStats is the information to report about pending transactions.
...
...
@@ -552,7 +563,7 @@ type pendStats struct {
// reportPending retrieves the current number of pending transactions and reports
// it to the stats server.
func
(
s
*
Service
)
reportPending
(
out
*
json
.
Encoder
)
error
{
func
(
s
*
Service
)
reportPending
(
conn
*
websocket
.
Conn
)
error
{
// Retrieve the pending count from the local blockchain
var
pending
int
if
s
.
eth
!=
nil
{
...
...
@@ -561,6 +572,8 @@ func (s *Service) reportPending(out *json.Encoder) error {
pending
=
s
.
les
.
TxPool
()
.
Stats
()
}
// Assemble the transaction stats and send it to the server
log
.
Trace
(
"Sending pending transactions to ethstats"
,
"count"
,
pending
)
stats
:=
map
[
string
]
interface
{}{
"id"
:
s
.
node
,
"stats"
:
&
pendStats
{
...
...
@@ -570,10 +583,10 @@ func (s *Service) reportPending(out *json.Encoder) error {
report
:=
map
[
string
][]
interface
{}{
"emit"
:
{
"pending"
,
stats
},
}
return
out
.
Encode
(
report
)
return
websocket
.
JSON
.
Send
(
conn
,
report
)
}
//
block
Stats is the information to report about the local node.
//
node
Stats is the information to report about the local node.
type
nodeStats
struct
{
Active
bool
`json:"active"`
Syncing
bool
`json:"syncing"`
...
...
@@ -586,7 +599,7 @@ type nodeStats struct {
// reportPending retrieves various stats about the node at the networking and
// mining layer and reports it to the stats server.
func
(
s
*
Service
)
reportStats
(
out
*
json
.
Encoder
)
error
{
func
(
s
*
Service
)
reportStats
(
conn
*
websocket
.
Conn
)
error
{
// Gather the syncing and mining infos from the local miner instance
var
(
mining
bool
...
...
@@ -606,6 +619,9 @@ func (s *Service) reportStats(out *json.Encoder) error {
sync
:=
s
.
les
.
Downloader
()
.
Progress
()
syncing
=
s
.
les
.
BlockChain
()
.
CurrentHeader
()
.
Number
.
Uint64
()
>=
sync
.
HighestBlock
}
// Assemble the node stats and send it to the server
log
.
Trace
(
"Sending node details to ethstats"
)
stats
:=
map
[
string
]
interface
{}{
"id"
:
s
.
node
,
"stats"
:
&
nodeStats
{
...
...
@@ -621,5 +637,5 @@ func (s *Service) reportStats(out *json.Encoder) error {
report
:=
map
[
string
][]
interface
{}{
"emit"
:
{
"stats"
,
stats
},
}
return
out
.
Encode
(
report
)
return
websocket
.
JSON
.
Send
(
conn
,
report
)
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment