Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
bba5fd81
Commit
bba5fd81
authored
Nov 26, 2018
by
holisticode
Committed by
Anton Evangelatov
Nov 26, 2018
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Accounting metrics reporter (#18136)
parent
2714e8f0
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
305 additions
and
30 deletions
+305
-30
accounting.go
p2p/protocols/accounting.go
+33
-10
accounting_simulation_test.go
p2p/protocols/accounting_simulation_test.go
+10
-0
reporter.go
p2p/protocols/reporter.go
+147
-0
reporter_test.go
p2p/protocols/reporter_test.go
+77
-0
swap.go
swarm/swap/swap.go
+5
-0
swarm.go
swarm/swarm.go
+33
-20
No files found.
p2p/protocols/accounting.go
View file @
bba5fd81
...
...
@@ -16,29 +16,32 @@
package
protocols
import
"github.com/ethereum/go-ethereum/metrics"
import
(
"time"
"github.com/ethereum/go-ethereum/metrics"
)
//define some metrics
var
(
//NOTE: these metrics just define the interfaces and are currently *NOT persisted* over sessions
//All metrics are cumulative
//total amount of units credited
mBalanceCredit
=
metrics
.
NewRegisteredCounterForced
(
"account.balance.credit"
,
nil
)
mBalanceCredit
metrics
.
Counter
//total amount of units debited
mBalanceDebit
=
metrics
.
NewRegisteredCounterForced
(
"account.balance.debit"
,
nil
)
mBalanceDebit
metrics
.
Counter
//total amount of bytes credited
mBytesCredit
=
metrics
.
NewRegisteredCounterForced
(
"account.bytes.credit"
,
nil
)
mBytesCredit
metrics
.
Counter
//total amount of bytes debited
mBytesDebit
=
metrics
.
NewRegisteredCounterForced
(
"account.bytes.debit"
,
nil
)
mBytesDebit
metrics
.
Counter
//total amount of credited messages
mMsgCredit
=
metrics
.
NewRegisteredCounterForced
(
"account.msg.credit"
,
nil
)
mMsgCredit
metrics
.
Counter
//total amount of debited messages
mMsgDebit
=
metrics
.
NewRegisteredCounterForced
(
"account.msg.debit"
,
nil
)
mMsgDebit
metrics
.
Counter
//how many times local node had to drop remote peers
mPeerDrops
=
metrics
.
NewRegisteredCounterForced
(
"account.peerdrops"
,
nil
)
mPeerDrops
metrics
.
Counter
//how many times local node overdrafted and dropped
mSelfDrops
=
metrics
.
NewRegisteredCounterForced
(
"account.selfdrops"
,
nil
)
mSelfDrops
metrics
.
Counter
)
//Prices defines how prices are being passed on to the accounting instance
...
...
@@ -105,6 +108,26 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return
ah
}
//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
//this registry should be independent of any other metrics as it persists at different endpoints.
//It also instantiates the given metrics and starts the persisting go-routine which
//at the passed interval writes the metrics to a LevelDB
func
SetupAccountingMetrics
(
reportInterval
time
.
Duration
,
path
string
)
*
AccountingMetrics
{
//create an empty registry
registry
:=
metrics
.
NewRegistry
()
//instantiate the metrics
mBalanceCredit
=
metrics
.
NewRegisteredCounterForced
(
"account.balance.credit"
,
registry
)
mBalanceDebit
=
metrics
.
NewRegisteredCounterForced
(
"account.balance.debit"
,
registry
)
mBytesCredit
=
metrics
.
NewRegisteredCounterForced
(
"account.bytes.credit"
,
registry
)
mBytesDebit
=
metrics
.
NewRegisteredCounterForced
(
"account.bytes.debit"
,
registry
)
mMsgCredit
=
metrics
.
NewRegisteredCounterForced
(
"account.msg.credit"
,
registry
)
mMsgDebit
=
metrics
.
NewRegisteredCounterForced
(
"account.msg.debit"
,
registry
)
mPeerDrops
=
metrics
.
NewRegisteredCounterForced
(
"account.peerdrops"
,
registry
)
mSelfDrops
=
metrics
.
NewRegisteredCounterForced
(
"account.selfdrops"
,
registry
)
//create the DB and start persisting
return
NewAccountingMetrics
(
registry
,
reportInterval
,
path
)
}
//Implement Hook.Send
// Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
...
...
p2p/protocols/accounting_simulation_test.go
View file @
bba5fd81
...
...
@@ -20,7 +20,10 @@ import (
"context"
"flag"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
...
...
@@ -66,6 +69,13 @@ func init() {
func
TestAccountingSimulation
(
t
*
testing
.
T
)
{
//setup the balances objects for every node
bal
:=
newBalances
(
*
nodes
)
//setup the metrics system or tests will fail trying to write metrics
dir
,
err
:=
ioutil
.
TempDir
(
""
,
"account-sim"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
os
.
RemoveAll
(
dir
)
SetupAccountingMetrics
(
1
*
time
.
Second
,
filepath
.
Join
(
dir
,
"metrics.db"
))
//define the node.Service for this test
services
:=
adapters
.
Services
{
"accounting"
:
func
(
ctx
*
adapters
.
ServiceContext
)
(
node
.
Service
,
error
)
{
...
...
p2p/protocols/reporter.go
0 → 100644
View file @
bba5fd81
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package
protocols
import
(
"encoding/binary"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/syndtr/goleveldb/leveldb"
)
//AccountMetrics abstracts away the metrics DB and
//the reporter to persist metrics
type
AccountingMetrics
struct
{
reporter
*
reporter
}
//Close will be called when the node is being shutdown
//for a graceful cleanup
func
(
am
*
AccountingMetrics
)
Close
()
{
close
(
am
.
reporter
.
quit
)
am
.
reporter
.
db
.
Close
()
}
//reporter is an internal structure used to write p2p accounting related
//metrics to a LevelDB. It will periodically write the accrued metrics to the DB.
type
reporter
struct
{
reg
metrics
.
Registry
//the registry for these metrics (independent of other metrics)
interval
time
.
Duration
//duration at which the reporter will persist metrics
db
*
leveldb
.
DB
//the actual DB
quit
chan
struct
{}
//quit the reporter loop
}
//NewMetricsDB creates a new LevelDB instance used to persist metrics defined
//inside p2p/protocols/accounting.go
func
NewAccountingMetrics
(
r
metrics
.
Registry
,
d
time
.
Duration
,
path
string
)
*
AccountingMetrics
{
var
val
=
make
([]
byte
,
8
)
var
err
error
//Create the LevelDB
db
,
err
:=
leveldb
.
OpenFile
(
path
,
nil
)
if
err
!=
nil
{
log
.
Error
(
err
.
Error
())
return
nil
}
//Check for all defined metrics that there is a value in the DB
//If there is, assign it to the metric. This means that the node
//has been running before and that metrics have been persisted.
metricsMap
:=
map
[
string
]
metrics
.
Counter
{
"account.balance.credit"
:
mBalanceCredit
,
"account.balance.debit"
:
mBalanceDebit
,
"account.bytes.credit"
:
mBytesCredit
,
"account.bytes.debit"
:
mBytesDebit
,
"account.msg.credit"
:
mMsgCredit
,
"account.msg.debit"
:
mMsgDebit
,
"account.peerdrops"
:
mPeerDrops
,
"account.selfdrops"
:
mSelfDrops
,
}
//iterate the map and get the values
for
key
,
metric
:=
range
metricsMap
{
val
,
err
=
db
.
Get
([]
byte
(
key
),
nil
)
//until the first time a value is being written,
//this will return an error.
//it could be beneficial though to log errors later,
//but that would require a different logic
if
err
==
nil
{
metric
.
Inc
(
int64
(
binary
.
BigEndian
.
Uint64
(
val
)))
}
}
//create the reporter
rep
:=
&
reporter
{
reg
:
r
,
interval
:
d
,
db
:
db
,
quit
:
make
(
chan
struct
{}),
}
//run the go routine
go
rep
.
run
()
m
:=
&
AccountingMetrics
{
reporter
:
rep
,
}
return
m
}
//run is the goroutine which periodically sends the metrics to the configured LevelDB
func
(
r
*
reporter
)
run
()
{
intervalTicker
:=
time
.
NewTicker
(
r
.
interval
)
for
{
select
{
case
<-
intervalTicker
.
C
:
//at each tick send the metrics
if
err
:=
r
.
save
();
err
!=
nil
{
log
.
Error
(
"unable to send metrics to LevelDB"
,
"err"
,
err
)
//If there is an error in writing, exit the routine; we assume here that the error is
//severe and don't attempt to write again.
//Also, this should prevent leaking when the node is stopped
return
}
case
<-
r
.
quit
:
//graceful shutdown
return
}
}
}
//send the metrics to the DB
func
(
r
*
reporter
)
save
()
error
{
//create a LevelDB Batch
batch
:=
leveldb
.
Batch
{}
//for each metric in the registry (which is independent)...
r
.
reg
.
Each
(
func
(
name
string
,
i
interface
{})
{
metric
,
ok
:=
i
.
(
metrics
.
Counter
)
if
ok
{
//assuming every metric here to be a Counter (separate registry)
//...create a snapshot...
ms
:=
metric
.
Snapshot
()
byteVal
:=
make
([]
byte
,
8
)
binary
.
BigEndian
.
PutUint64
(
byteVal
,
uint64
(
ms
.
Count
()))
//...and save the value to the DB
batch
.
Put
([]
byte
(
name
),
byteVal
)
}
})
return
r
.
db
.
Write
(
&
batch
,
nil
)
}
p2p/protocols/reporter_test.go
0 → 100644
View file @
bba5fd81
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package
protocols
import
(
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
)
//TestReporter tests that the metrics being collected for p2p accounting
//are being persisted and available after restart of a node.
//It simulates restarting by just recreating the DB as if the node had restarted.
func
TestReporter
(
t
*
testing
.
T
)
{
//create a test directory
dir
,
err
:=
ioutil
.
TempDir
(
""
,
"reporter-test"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
os
.
RemoveAll
(
dir
)
//setup the metrics
log
.
Debug
(
"Setting up metrics first time"
)
reportInterval
:=
5
*
time
.
Millisecond
metrics
:=
SetupAccountingMetrics
(
reportInterval
,
filepath
.
Join
(
dir
,
"test.db"
))
log
.
Debug
(
"Done."
)
//do some metrics
mBalanceCredit
.
Inc
(
12
)
mBytesCredit
.
Inc
(
34
)
mMsgDebit
.
Inc
(
9
)
//give the reporter time to write the metrics to DB
time
.
Sleep
(
20
*
time
.
Millisecond
)
//set the metrics to nil - this effectively simulates the node having shut down...
mBalanceCredit
=
nil
mBytesCredit
=
nil
mMsgDebit
=
nil
//close the DB also, or we can't create a new one
metrics
.
Close
()
//setup the metrics again
log
.
Debug
(
"Setting up metrics second time"
)
metrics
=
SetupAccountingMetrics
(
reportInterval
,
filepath
.
Join
(
dir
,
"test.db"
))
defer
metrics
.
Close
()
log
.
Debug
(
"Done."
)
//now check the metrics, they should have the same value as before "shutdown"
if
mBalanceCredit
.
Count
()
!=
12
{
t
.
Fatalf
(
"Expected counter to be %d, but is %d"
,
12
,
mBalanceCredit
.
Count
())
}
if
mBytesCredit
.
Count
()
!=
34
{
t
.
Fatalf
(
"Expected counter to be %d, but is %d"
,
23
,
mBytesCredit
.
Count
())
}
if
mMsgDebit
.
Count
()
!=
9
{
t
.
Fatalf
(
"Expected counter to be %d, but is %d"
,
9
,
mMsgDebit
.
Count
())
}
}
swarm/swap/swap.go
View file @
bba5fd81
...
...
@@ -91,3 +91,8 @@ func (s *Swap) loadState(peer *protocols.Peer) (err error) {
}
return
}
//Clean up Swap
func
(
swap
*
Swap
)
Close
()
{
swap
.
stateStore
.
Close
()
}
swarm/swarm.go
View file @
bba5fd81
// Copyright 201
6
The go-ethereum Authors
// Copyright 201
8
The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
...
...
@@ -66,20 +66,22 @@ var (
// the swarm stack
type
Swarm
struct
{
config
*
api
.
Config
// swarm configuration
api
*
api
.
API
// high level api layer (fs/manifest)
dns
api
.
Resolver
// DNS registrar
fileStore
*
storage
.
FileStore
// distributed preimage archive, the local API to the storage with document level storage/retrieval support
streamer
*
stream
.
Registry
bzz
*
network
.
Bzz
// the logistic manager
backend
chequebook
.
Backend
// simple blockchain Backend
privateKey
*
ecdsa
.
PrivateKey
corsString
string
swapEnabled
bool
netStore
*
storage
.
NetStore
sfs
*
fuse
.
SwarmFS
// need this to cleanup all the active mounts on node exit
ps
*
pss
.
Pss
swap
*
swap
.
Swap
config
*
api
.
Config
// swarm configuration
api
*
api
.
API
// high level api layer (fs/manifest)
dns
api
.
Resolver
// DNS registrar
fileStore
*
storage
.
FileStore
// distributed preimage archive, the local API to the storage with document level storage/retrieval support
streamer
*
stream
.
Registry
bzz
*
network
.
Bzz
// the logistic manager
backend
chequebook
.
Backend
// simple blockchain Backend
privateKey
*
ecdsa
.
PrivateKey
corsString
string
swapEnabled
bool
netStore
*
storage
.
NetStore
sfs
*
fuse
.
SwarmFS
// need this to cleanup all the active mounts on node exit
ps
*
pss
.
Pss
swap
*
swap
.
Swap
stateStore
*
state
.
DBStore
accountingMetrics
*
protocols
.
AccountingMetrics
tracerClose
io
.
Closer
}
...
...
@@ -134,7 +136,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
LightNode
:
config
.
LightNodeEnabled
,
}
s
tateStore
,
err
:
=
state
.
NewDBStore
(
filepath
.
Join
(
config
.
Path
,
"state-store.db"
))
s
elf
.
stateStore
,
err
=
state
.
NewDBStore
(
filepath
.
Join
(
config
.
Path
,
"state-store.db"
))
if
err
!=
nil
{
return
}
...
...
@@ -179,6 +181,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
return
nil
,
err
}
self
.
swap
=
swap
.
New
(
balancesStore
)
self
.
accountingMetrics
=
protocols
.
SetupAccountingMetrics
(
10
*
time
.
Second
,
filepath
.
Join
(
config
.
Path
,
"metrics.db"
))
}
var
nodeID
enode
.
ID
...
...
@@ -203,7 +206,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
SyncUpdateDelay
:
config
.
SyncUpdateDelay
,
MaxPeerServers
:
config
.
MaxStreamPeerServers
,
}
self
.
streamer
=
stream
.
NewRegistry
(
nodeID
,
delivery
,
self
.
netStore
,
stateStore
,
registryOptions
,
self
.
swap
)
self
.
streamer
=
stream
.
NewRegistry
(
nodeID
,
delivery
,
self
.
netStore
,
s
elf
.
s
tateStore
,
registryOptions
,
self
.
swap
)
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
self
.
fileStore
=
storage
.
NewFileStore
(
self
.
netStore
,
self
.
config
.
FileStoreParams
)
...
...
@@ -226,7 +229,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
log
.
Debug
(
"Setup local storage"
)
self
.
bzz
=
network
.
NewBzz
(
bzzconfig
,
to
,
stateStore
,
self
.
streamer
.
GetSpec
(),
self
.
streamer
.
Run
)
self
.
bzz
=
network
.
NewBzz
(
bzzconfig
,
to
,
s
elf
.
s
tateStore
,
self
.
streamer
.
GetSpec
(),
self
.
streamer
.
Run
)
// Pss = postal service over swarm (devp2p over bzz)
self
.
ps
,
err
=
pss
.
NewPss
(
to
,
config
.
Pss
)
...
...
@@ -446,14 +449,24 @@ func (self *Swarm) Stop() error {
ch
.
Stop
()
ch
.
Save
()
}
if
self
.
swap
!=
nil
{
self
.
swap
.
Close
()
}
if
self
.
accountingMetrics
!=
nil
{
self
.
accountingMetrics
.
Close
()
}
if
self
.
netStore
!=
nil
{
self
.
netStore
.
Close
()
}
self
.
sfs
.
Stop
()
stopCounter
.
Inc
(
1
)
self
.
streamer
.
Stop
()
return
self
.
bzz
.
Stop
()
err
:=
self
.
bzz
.
Stop
()
if
self
.
stateStore
!=
nil
{
self
.
stateStore
.
Close
()
}
return
err
}
// implements the node.Service interface
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment