core, ethdb, metrics, p2p: expose various counter metrics for grafana

parent f9c0e093
......@@ -46,6 +46,10 @@ import (
)
var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
......@@ -332,6 +336,7 @@ func (bc *BlockChain) loadLastState() error {
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
// Restore the last known head header
currentHeader := currentBlock.Header()
......@@ -344,12 +349,14 @@ func (bc *BlockChain) loadLastState() error {
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
// Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock()
......@@ -388,6 +395,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
// Rewind the fast block in a simpleton way to the target head
......@@ -399,6 +407,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
}
......@@ -450,6 +459,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// If all checks out, manually set the head block
bc.chainmu.Lock()
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
......@@ -522,9 +532,12 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
......@@ -598,6 +611,7 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadBlockHash(bc.db, block.Hash())
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
......@@ -605,6 +619,7 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
......@@ -862,11 +877,13 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
// Truncate ancient data which exceeds the current header.
......@@ -952,6 +969,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
isCanonical = true
}
}
......
......@@ -104,6 +104,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
}
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
return hc, nil
}
......@@ -185,12 +186,12 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())
status = CanonStatTy
} else {
status = SideStatTy
}
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
......@@ -456,6 +457,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64())
}
type (
......@@ -508,6 +510,7 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
}
batch.Write()
......
......@@ -80,8 +80,9 @@ type freezer struct {
func newFreezer(datadir string, namespace string) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil)
)
// Ensure the datadir is not a symbolic link if it exists.
if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
......@@ -102,7 +103,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
instanceLock: lock,
}
for name, disableSnappy := range freezerNoSnappy {
table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy)
table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy)
if err != nil {
for _, table := range freezer.tables {
table.Close()
......
......@@ -94,17 +94,18 @@ type freezerTable struct {
// to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items)
headBytes uint32 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
headBytes uint32 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables
logger log.Logger // Logger with database path and table name ambedded
lock sync.RWMutex // Mutex protecting the data file descriptors
}
// newTable opens a freezer table with default settings - 2G files
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) {
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy)
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) {
return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy)
}
// openFreezerFileForAppend opens a freezer table file and seeks to the end
......@@ -148,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error {
// newCustomTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
......@@ -171,6 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
sizeCounter: sizeCounter,
name: name,
path: path,
logger: log.New("database", path, "table", name),
......@@ -181,6 +183,14 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
tab.Close()
return nil, err
}
// Initialize the starting size counter
size, err := tab.sizeNolock()
if err != nil {
tab.Close()
return nil, err
}
tab.sizeCounter.Inc(int64(size))
return tab, nil
}
......@@ -321,6 +331,11 @@ func (t *freezerTable) truncate(items uint64) error {
if atomic.LoadUint64(&t.items) <= items {
return nil
}
// We need to truncate, save the old size for metrics tracking
oldSize, err := t.sizeNolock()
if err != nil {
return err
}
// Something's out of sync, truncate the table's offset index
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
......@@ -355,6 +370,14 @@ func (t *freezerTable) truncate(items uint64) error {
// All data files truncated, set internal counters and return
atomic.StoreUint64(&t.items, items)
atomic.StoreUint32(&t.headBytes, expected.offset)
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
return err
}
t.sizeCounter.Dec(int64(oldSize - newSize))
return nil
}
......@@ -483,7 +506,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
}
// Write indexEntry
t.index.Write(idx.marshallBinary())
t.writeMeter.Mark(int64(bLen + indexEntrySize))
t.sizeCounter.Inc(int64(bLen + indexEntrySize))
atomic.AddUint64(&t.items, 1)
return nil
}
......@@ -562,6 +588,12 @@ func (t *freezerTable) size() (uint64, error) {
t.lock.RLock()
defer t.lock.RUnlock()
return t.sizeNolock()
}
// sizeNolock returns the total data size in the freezer table without obtaining
// the mutex first.
func (t *freezerTable) sizeNolock() (uint64, error) {
stat, err := t.index.Stat()
if err != nil {
return 0, err
......
......@@ -56,7 +56,7 @@ func TestFreezerBasics(t *testing.T) {
// set cutoff at 50 bytes
f, err := newCustomTable(os.TempDir(),
fmt.Sprintf("unittest-%d", rand.Uint64()),
metrics.NewMeter(), metrics.NewMeter(), 50, true)
metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter(), 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -98,12 +98,12 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Parallel()
// set cutoff at 50 bytes
var (
fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
m1, m2 = metrics.NewMeter(), metrics.NewMeter()
f *freezerTable
err error
fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
rm, wm, sc = metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
f *freezerTable
err error
)
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -112,7 +112,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
data := getChunk(15, x)
f.Append(uint64(x), data)
f.Close()
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
}
defer f.Close()
......@@ -126,7 +126,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
}
f.Close()
f, err = newCustomTable(os.TempDir(), fname, m1, m2, 50, true)
f, err = newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -136,11 +136,11 @@ func TestFreezerBasicsClosing(t *testing.T) {
// TestFreezerRepairDanglingHead tests that we can recover if index entries are removed
func TestFreezerRepairDanglingHead(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -169,7 +169,7 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
idxFile.Close()
// Now open it again
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// The last item should be missing
if _, err = f.Retrieve(0xff); err == nil {
t.Errorf("Expected error for missing index entry")
......@@ -184,11 +184,11 @@ func TestFreezerRepairDanglingHead(t *testing.T) {
// TestFreezerRepairDanglingHeadLarge tests that we can recover if very many index entries are removed
func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_headtest-%d", rand.Uint64())
{ // Fill a table and close it
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -216,7 +216,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
idxFile.Close()
// Now open it again
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// The first item should be there
if _, err = f.Retrieve(0); err != nil {
t.Fatal(err)
......@@ -234,7 +234,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
}
// And if we open it, we should now be able to read all of them (new values)
{
f, _ := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, _ := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
for y := 1; y < 255; y++ {
exp := getChunk(15, ^y)
got, err := f.Retrieve(uint64(y))
......@@ -251,11 +251,11 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
// TestSnappyDetection tests that we fail to open a snappy database and vice versa
func TestSnappyDetection(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("snappytest-%d", rand.Uint64())
// Open with snappy
{
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -268,7 +268,7 @@ func TestSnappyDetection(t *testing.T) {
}
// Open without snappy
{
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, false)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, false)
if _, err = f.Retrieve(0); err == nil {
f.Close()
t.Fatalf("expected empty table")
......@@ -277,7 +277,7 @@ func TestSnappyDetection(t *testing.T) {
// Open with snappy
{
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
// There should be 255 items
if _, err = f.Retrieve(0xfe); err != nil {
f.Close()
......@@ -302,11 +302,11 @@ func assertFileSize(f string, size int64) error {
// the index is repaired
func TestFreezerRepairDanglingIndex(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("dangling_indextest-%d", rand.Uint64())
{ // Fill a table and close it
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -342,7 +342,7 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
// 45, 45, 15
// with 3+3+1 items
{
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -359,11 +359,11 @@ func TestFreezerRepairDanglingIndex(t *testing.T) {
func TestFreezerTruncate(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("truncation-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -380,7 +380,7 @@ func TestFreezerTruncate(t *testing.T) {
}
// Reopen, truncate
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -402,10 +402,10 @@ func TestFreezerTruncate(t *testing.T) {
// That will rewind the index, and _should_ truncate the head file
func TestFreezerRepairFirstFile(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -433,7 +433,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
}
// Reopen
{
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -458,10 +458,10 @@ func TestFreezerRepairFirstFile(t *testing.T) {
// - check that we did not keep the rdonly file descriptors
func TestFreezerReadAndTruncate(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -478,7 +478,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
}
// Reopen and read all files
{
f, err := newCustomTable(os.TempDir(), fname, wm, rm, 50, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 50, true)
if err != nil {
t.Fatal(err)
}
......@@ -504,10 +504,10 @@ func TestFreezerReadAndTruncate(t *testing.T) {
func TestOffset(t *testing.T) {
t.Parallel()
wm, rm := metrics.NewMeter(), metrics.NewMeter()
rm, wm, sc := metrics.NewMeter(), metrics.NewMeter(), metrics.NewCounter()
fname := fmt.Sprintf("offset-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true)
if err != nil {
t.Fatal(err)
}
......@@ -563,7 +563,7 @@ func TestOffset(t *testing.T) {
}
// Now open again
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, 40, true)
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sc, 40, true)
if err != nil {
t.Fatal(err)
}
......
......@@ -418,9 +418,9 @@ func (l *txPricedList) Put(tx *types.Transaction) {
// Removed notifies the prices transaction list that an old transaction dropped
// from the pool. The list will just keep a counter of stale objects and update
// the heap if a large enough ratio of transactions go stale.
func (l *txPricedList) Removed() {
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales++
l.stales += count
if l.stales <= len(*l.items)/4 {
return
}
......
This diff is collapsed.
......@@ -442,9 +442,7 @@ func (s *stateSync) process(req *stateReq) (int, error) {
default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
}
delete(req.tasks, hash)
}
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
......
......@@ -67,6 +67,7 @@ type Database struct {
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
......@@ -112,6 +113,7 @@ func New(file string, cache int, handles int, namespace string) (*Database, erro
ldb.compTimeMeter = metrics.NewRegisteredMeter(namespace+"compact/time", nil)
ldb.compReadMeter = metrics.NewRegisteredMeter(namespace+"compact/input", nil)
ldb.compWriteMeter = metrics.NewRegisteredMeter(namespace+"compact/output", nil)
ldb.diskSizeGauge = metrics.NewRegisteredGauge(namespace+"disk/size", nil)
ldb.diskReadMeter = metrics.NewRegisteredMeter(namespace+"disk/read", nil)
ldb.diskWriteMeter = metrics.NewRegisteredMeter(namespace+"disk/write", nil)
ldb.writeDelayMeter = metrics.NewRegisteredMeter(namespace+"compact/writedelay/duration", nil)
......@@ -233,7 +235,7 @@ func (db *Database) meter(refresh time.Duration) {
// Create the counters to store current and previous compaction values
compactions := make([][]float64, 2)
for i := 0; i < 2; i++ {
compactions[i] = make([]float64, 3)
compactions[i] = make([]float64, 4)
}
// Create storage for iostats.
var iostats [2]float64
......@@ -279,7 +281,7 @@ func (db *Database) meter(refresh time.Duration) {
if len(parts) != 6 {
break
}
for idx, counter := range parts[3:] {
for idx, counter := range parts[2:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
db.log.Error("Compaction entry parsing failed", "err", err)
......@@ -290,16 +292,18 @@ func (db *Database) meter(refresh time.Duration) {
}
}
// Update all the requested meters
if db.diskSizeGauge != nil {
db.diskSizeGauge.Update(int64(compactions[i%2][0] * 1024 * 1024))
}
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
db.compTimeMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
db.compReadMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
db.compWriteMeter.Mark(int64((compactions[i%2][3] - compactions[(i-1)%2][3]) * 1024 * 1024))
}
// Retrieve the write delay statistic
writedelay, err := db.db.GetProperty("leveldb.writedelay")
if err != nil {
......
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package metrics
import "github.com/elastic/gosigar"
// CPUStats is the system and process CPU stats.
type CPUStats struct {
GlobalTime int64 // Time spent by the CPU working on all processes
GlobalWait int64 // Time spent by waiting on disk for all processes
LocalTime int64 // Time spent by the CPU working on this process
}
// ReadCPUStats retrieves the current CPU stats.
func ReadCPUStats(stats *CPUStats) {
global := gosigar.Cpu{}
global.Get()
stats.GlobalTime = int64(global.User + global.Nice + global.Sys)
stats.GlobalWait = int64(global.Wait)
stats.LocalTime = getProcessCPUTime()
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build !windows
package metrics
import (
"syscall"
"github.com/ethereum/go-ethereum/log"
)
// getProcessCPUTime retrieves the process' CPU time since program startup.
func getProcessCPUTime() int64 {
var usage syscall.Rusage
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &usage); err != nil {
log.Warn("Failed to retrieve CPU time", "err", err)
return 0
}
return int64(usage.Utime.Sec+usage.Stime.Sec)*100 + int64(usage.Utime.Usec+usage.Stime.Usec)/10000 //nolint:unconvert
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package metrics
// getProcessCPUTime returns 0 on Windows as there is no system call to resolve
// the actual process' CPU time.
func getProcessCPUTime() int64 {
return 0
}
......@@ -61,18 +61,27 @@ func CollectProcessMetrics(refresh time.Duration) {
if !Enabled {
return
}
refreshFreq := int64(refresh / time.Second)
// Create the various data collectors
cpuStats := make([]*CPUStats, 2)
memstats := make([]*runtime.MemStats, 2)
diskstats := make([]*DiskStats, 2)
for i := 0; i < len(memstats); i++ {
cpuStats[i] = new(CPUStats)
memstats[i] = new(runtime.MemStats)
diskstats[i] = new(DiskStats)
}
// Define the various metrics to collect
cpuSysLoad := GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry)
cpuSysWait := GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry)
cpuProcLoad := GetOrRegisterGauge("system/cpu/procload", DefaultRegistry)
memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry)
memAllocs := GetOrRegisterMeter("system/memory/allocs", DefaultRegistry)
memFrees := GetOrRegisterMeter("system/memory/frees", DefaultRegistry)
memInuse := GetOrRegisterMeter("system/memory/inuse", DefaultRegistry)
memPauses := GetOrRegisterMeter("system/memory/pauses", DefaultRegistry)
memHeld := GetOrRegisterGauge("system/memory/held", DefaultRegistry)
memUsed := GetOrRegisterGauge("system/memory/used", DefaultRegistry)
var diskReads, diskReadBytes, diskWrites, diskWriteBytes Meter
var diskReadBytesCounter, diskWriteBytesCounter Counter
......@@ -91,11 +100,17 @@ func CollectProcessMetrics(refresh time.Duration) {
location1 := i % 2
location2 := (i - 1) % 2
ReadCPUStats(cpuStats[location1])
cpuSysLoad.Update((cpuStats[location1].GlobalTime - cpuStats[location2].GlobalTime) / refreshFreq)
cpuSysWait.Update((cpuStats[location1].GlobalWait - cpuStats[location2].GlobalWait) / refreshFreq)
cpuProcLoad.Update((cpuStats[location1].LocalTime - cpuStats[location2].LocalTime) / refreshFreq)
runtime.ReadMemStats(memstats[location1])
memPauses.Mark(int64(memstats[location1].PauseTotalNs - memstats[location2].PauseTotalNs))
memAllocs.Mark(int64(memstats[location1].Mallocs - memstats[location2].Mallocs))
memFrees.Mark(int64(memstats[location1].Frees - memstats[location2].Frees))
memInuse.Mark(int64(memstats[location1].Alloc - memstats[location2].Alloc))
memPauses.Mark(int64(memstats[location1].PauseTotalNs - memstats[location2].PauseTotalNs))
memHeld.Update(int64(memstats[location1].HeapSys - memstats[location1].HeapReleased))
memUsed.Update(int64(memstats[location1].Alloc))
if ReadDiskStats(diskstats[location1]) == nil {
diskReads.Mark(diskstats[location1].ReadCount - diskstats[location2].ReadCount)
......
......@@ -25,18 +25,17 @@ import (
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
)
const (
MetricsInboundConnects = "p2p/InboundConnects" // Name for the registered inbound connects meter
MetricsInboundTraffic = "p2p/InboundTraffic" // Name for the registered inbound traffic meter
MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter
MetricsOutboundTraffic = "p2p/OutboundTraffic" // Name for the registered outbound traffic meter
MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
MeteredPeerLimit = 1024 // This amount of peers are individually metered
)
......@@ -46,6 +45,7 @@ var (
ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
activePeerCounter = metrics.NewRegisteredCounter("p2p/peers", nil) // Gauge tracking the current peer count
PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
......@@ -124,6 +124,8 @@ func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn {
} else {
egressConnectMeter.Mark(1)
}
activePeerCounter.Inc(1)
return &meteredConn{
Conn: conn,
ip: ip,
......@@ -198,6 +200,7 @@ func (c *meteredConn) Close() error {
IP: c.ip,
Elapsed: time.Since(c.connected),
})
activePeerCounter.Dec(1)
return err
}
id := c.id
......@@ -209,6 +212,7 @@ func (c *meteredConn) Close() error {
IP: c.ip,
ID: id,
})
activePeerCounter.Dec(1)
return err
}
ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count())
......@@ -229,5 +233,6 @@ func (c *meteredConn) Close() error {
Ingress: ingress,
Egress: egress,
})
activePeerCounter.Dec(1)
return err
}
......@@ -685,9 +685,8 @@ running:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID()]; ok {
delete(trusted, n.ID())
}
delete(trusted, n.ID())
// Unmark any already-connected peer as trusted
if p, ok := peers[n.ID()]; ok {
p.rw.set(trustedConn, false)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment