Unverified Commit f6c3a534 authored by Martin Holst Swende's avatar Martin Holst Swende Committed by GitHub

metrics/influxdb: use smaller dependency and reuse code between v1 and v2 reporters (#26963)

This change switches to use the smaller influxdata/influxdb1-client package instead of depending on the whole infuxdb package. The new smaller client is very similar to the influxdb-v2 client, which made it possible to refactor the two reporters to reuse code a lot more. 
parent 7dc10071
...@@ -38,8 +38,8 @@ require ( ...@@ -38,8 +38,8 @@ require (
github.com/holiman/bloomfilter/v2 v2.0.3 github.com/holiman/bloomfilter/v2 v2.0.3
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c
github.com/huin/goupnp v1.0.3 github.com/huin/goupnp v1.0.3
github.com/influxdata/influxdb v1.8.3
github.com/influxdata/influxdb-client-go/v2 v2.4.0 github.com/influxdata/influxdb-client-go/v2 v2.4.0
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/jackpal/go-nat-pmp v1.0.2 github.com/jackpal/go-nat-pmp v1.0.2
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e
github.com/julienschmidt/httprouter v1.3.0 github.com/julienschmidt/httprouter v1.3.0
......
This diff is collapsed.
...@@ -2,270 +2,112 @@ package influxdb ...@@ -2,270 +2,112 @@ package influxdb
import ( import (
"fmt" "fmt"
uurl "net/url"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/influxdata/influxdb/client"
) )
type reporter struct { func readMeter(namespace, name string, i interface{}) (string, map[string]interface{}) {
reg metrics.Registry switch metric := i.(type) {
interval time.Duration case metrics.Counter:
measurement := fmt.Sprintf("%s%s.count", namespace, name)
url uurl.URL fields := map[string]interface{}{
database string "value": metric.Count(),
username string
password string
namespace string
tags map[string]string
client *client.Client
cache map[string]int64
}
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
}
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
u, err := uurl.Parse(url)
if err != nil {
log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
return
}
rep := &reporter{
reg: r,
interval: d,
url: *u,
database: database,
username: username,
password: password,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
if err := rep.makeClient(); err != nil {
log.Warn("Unable to make InfluxDB client", "err", err)
return
}
rep.run()
}
// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
u, err := uurl.Parse(url)
if err != nil {
return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
}
rep := &reporter{
reg: r,
url: *u,
database: database,
username: username,
password: password,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
if err := rep.makeClient(); err != nil {
return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
}
if err := rep.send(); err != nil {
return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
}
return nil
}
func (r *reporter) makeClient() (err error) {
r.client, err = client.NewClient(client.Config{
URL: r.url,
Username: r.username,
Password: r.password,
Timeout: 10 * time.Second,
})
return
}
func (r *reporter) run() {
intervalTicker := time.NewTicker(r.interval)
pingTicker := time.NewTicker(time.Second * 5)
defer intervalTicker.Stop()
defer pingTicker.Stop()
for {
select {
case <-intervalTicker.C:
if err := r.send(); err != nil {
log.Warn("Unable to send to InfluxDB", "err", err)
}
case <-pingTicker.C:
_, _, err := r.client.Ping()
if err != nil {
log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
if err = r.makeClient(); err != nil {
log.Warn("Unable to make InfluxDB client", "err", err)
}
}
} }
} return measurement, fields
} case metrics.CounterFloat64:
measurement := fmt.Sprintf("%s%s.count", namespace, name)
func (r *reporter) send() error { fields := map[string]interface{}{
var pts []client.Point "value": metric.Count(),
r.reg.Each(func(name string, i interface{}) {
now := time.Now()
namespace := r.namespace
switch metric := i.(type) {
case metrics.Counter:
count := metric.Count()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": count,
},
Time: now,
})
case metrics.CounterFloat64:
count := metric.Count()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": count,
},
Time: now,
})
case metrics.Gauge:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Value(),
},
Time: now,
})
case metrics.GaugeFloat64:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"value": ms.Value(),
},
Time: now,
})
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() > 0 {
ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p25": ps[0],
"p50": ps[1],
"p75": ps[2],
"p95": ps[3],
"p99": ps[4],
"p999": ps[5],
"p9999": ps[6],
}
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
Tags: r.tags,
Fields: fields,
Time: now,
})
}
case metrics.Meter:
ms := metric.Snapshot()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
},
Time: now,
})
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
},
Time: now,
})
case metrics.ResettingTimer:
t := metric.Snapshot()
if len(t.Values()) > 0 {
ps := t.Percentiles([]float64{50, 95, 99})
val := t.Values()
pts = append(pts, client.Point{
Measurement: fmt.Sprintf("%s%s.span", namespace, name),
Tags: r.tags,
Fields: map[string]interface{}{
"count": len(val),
"max": val[len(val)-1],
"mean": t.Mean(),
"min": val[0],
"p50": ps[0],
"p95": ps[1],
"p99": ps[2],
},
Time: now,
})
}
} }
}) return measurement, fields
case metrics.Gauge:
bps := client.BatchPoints{ measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
Points: pts, fields := map[string]interface{}{
Database: r.database, "value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.GaugeFloat64:
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": metric.Snapshot().Value(),
}
return measurement, fields
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() <= 0 {
break
}
ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p25": ps[0],
"p50": ps[1],
"p75": ps[2],
"p95": ps[3],
"p99": ps[4],
"p999": ps[5],
"p9999": ps[6],
}
return measurement, fields
case metrics.Meter:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
}
return measurement, fields
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.timer", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
}
return measurement, fields
case metrics.ResettingTimer:
t := metric.Snapshot()
if len(t.Values()) == 0 {
break
}
ps := t.Percentiles([]float64{50, 95, 99})
val := t.Values()
measurement := fmt.Sprintf("%s%s.span", namespace, name)
fields := map[string]interface{}{
"count": len(val),
"max": val[len(val)-1],
"mean": t.Mean(),
"min": val[0],
"p50": ps[0],
"p95": ps[1],
"p99": ps[2],
}
return measurement, fields
} }
return "", nil
_, err := r.client.Write(bps)
return err
} }
package influxdb
import (
"fmt"
uurl "net/url"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
client "github.com/influxdata/influxdb1-client/v2"
)
type reporter struct {
reg metrics.Registry
interval time.Duration
url uurl.URL
database string
username string
password string
namespace string
tags map[string]string
client client.Client
cache map[string]int64
}
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
}
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
u, err := uurl.Parse(url)
if err != nil {
log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
return
}
rep := &reporter{
reg: r,
interval: d,
url: *u,
database: database,
username: username,
password: password,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
if err := rep.makeClient(); err != nil {
log.Warn("Unable to make InfluxDB client", "err", err)
return
}
rep.run()
}
// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
u, err := uurl.Parse(url)
if err != nil {
return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
}
rep := &reporter{
reg: r,
url: *u,
database: database,
username: username,
password: password,
namespace: namespace,
tags: tags,
cache: make(map[string]int64),
}
if err := rep.makeClient(); err != nil {
return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
}
if err := rep.send(); err != nil {
return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
}
return nil
}
func (r *reporter) makeClient() (err error) {
r.client, err = client.NewHTTPClient(client.HTTPConfig{
Addr: r.url.String(),
Username: r.username,
Password: r.password,
Timeout: 10 * time.Second,
})
return
}
func (r *reporter) run() {
intervalTicker := time.NewTicker(r.interval)
pingTicker := time.NewTicker(time.Second * 5)
defer intervalTicker.Stop()
defer pingTicker.Stop()
for {
select {
case <-intervalTicker.C:
if err := r.send(); err != nil {
log.Warn("Unable to send to InfluxDB", "err", err)
}
case <-pingTicker.C:
_, _, err := r.client.Ping(0)
if err != nil {
log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
if err = r.makeClient(); err != nil {
log.Warn("Unable to make InfluxDB client", "err", err)
}
}
}
}
}
func (r *reporter) send() error {
bps, err := client.NewBatchPoints(
client.BatchPointsConfig{
Database: r.database,
})
if err != nil {
return err
}
r.reg.Each(func(name string, i interface{}) {
now := time.Now()
measurement, fields := readMeter(r.namespace, name, i)
if fields == nil {
return
}
if p, err := client.NewPoint(measurement, r.tags, fields, now); err == nil {
bps.AddPoint(p)
}
})
return r.client.Write(bps)
}
...@@ -2,7 +2,6 @@ package influxdb ...@@ -2,7 +2,6 @@ package influxdb
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -78,144 +77,13 @@ func (r *v2Reporter) run() { ...@@ -78,144 +77,13 @@ func (r *v2Reporter) run() {
func (r *v2Reporter) send() { func (r *v2Reporter) send() {
r.reg.Each(func(name string, i interface{}) { r.reg.Each(func(name string, i interface{}) {
now := time.Now() now := time.Now()
namespace := r.namespace measurement, fields := readMeter(r.namespace, name, i)
if fields == nil {
switch metric := i.(type) { return
case metrics.Counter:
v := metric.Count()
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": v,
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.CounterFloat64:
v := metric.Count()
measurement := fmt.Sprintf("%s%s.count", namespace, name)
fields := map[string]interface{}{
"value": v,
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.Gauge:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": ms.Value(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.GaugeFloat64:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
fields := map[string]interface{}{
"value": ms.Value(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.Histogram:
ms := metric.Snapshot()
if ms.Count() > 0 {
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
}
case metrics.Meter:
ms := metric.Snapshot()
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"mean": ms.RateMean(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.Timer:
ms := metric.Snapshot()
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
measurement := fmt.Sprintf("%s%s.timer", namespace, name)
fields := map[string]interface{}{
"count": ms.Count(),
"max": ms.Max(),
"mean": ms.Mean(),
"min": ms.Min(),
"stddev": ms.StdDev(),
"variance": ms.Variance(),
"p50": ps[0],
"p75": ps[1],
"p95": ps[2],
"p99": ps[3],
"p999": ps[4],
"p9999": ps[5],
"m1": ms.Rate1(),
"m5": ms.Rate5(),
"m15": ms.Rate15(),
"meanrate": ms.RateMean(),
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
case metrics.ResettingTimer:
t := metric.Snapshot()
if len(t.Values()) > 0 {
ps := t.Percentiles([]float64{50, 95, 99})
val := t.Values()
measurement := fmt.Sprintf("%s%s.span", namespace, name)
fields := map[string]interface{}{
"count": len(val),
"max": val[len(val)-1],
"mean": t.Mean(),
"min": val[0],
"p50": ps[0],
"p95": ps[1],
"p99": ps[2],
}
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
}
} }
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
r.write.WritePoint(pt)
}) })
// Force all unwritten data to be sent // Force all unwritten data to be sent
r.write.Flush() r.write.Flush()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment