metrics.go 8.09 KB
Newer Older
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16

17 18 19 20 21 22
// Contains the meters and timers used by the networking layer.

package p2p

import (
	"net"
23 24 25
	"sync"
	"sync/atomic"
	"time"
26

27 28
	"github.com/ethereum/go-ethereum/event"
	"github.com/ethereum/go-ethereum/log"
29
	"github.com/ethereum/go-ethereum/metrics"
30 31
)

32
const (
33 34 35 36
	MetricsInboundTraffic   = "p2p/ingress" // Name for the registered inbound traffic meter
	MetricsOutboundTraffic  = "p2p/egress"  // Name for the registered outbound traffic meter
	MetricsOutboundConnects = "p2p/dials"   // Name for the registered outbound connects meter
	MetricsInboundConnects  = "p2p/serves"  // Name for the registered inbound connects meter
37 38 39 40

	MeteredPeerLimit = 1024 // This amount of peers are individually metered
)

41
var (
42 43 44 45
	ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil)  // Meter counting the ingress connections
	ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil)   // Meter metering the cumulative ingress traffic
	egressConnectMeter  = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
	egressTrafficMeter  = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil)  // Meter metering the cumulative egress traffic
46
	activePeerGauge     = metrics.NewRegisteredGauge("p2p/peers", nil)             // Gauge tracking the current peer count
47

48 49
	PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/")  // Registry containing the peer ingress
	PeerEgressRegistry  = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
50 51 52

	meteredPeerFeed  event.Feed // Event feed for peer metrics
	meteredPeerCount int32      // Actually stored peer connection count
53 54
)

55 56 57 58
// MeteredPeerEventType is the type of peer events emitted by a metered connection.
type MeteredPeerEventType int

const (
59 60 61
	// PeerHandshakeSucceeded is the type of event
	// emitted when a peer successfully makes the handshake.
	PeerHandshakeSucceeded MeteredPeerEventType = iota
62 63

	// PeerHandshakeFailed is the type of event emitted when a peer fails to
64
	// make the handshake or disconnects before it.
65
	PeerHandshakeFailed
66 67 68

	// PeerDisconnected is the type of event emitted when a peer disconnects.
	PeerDisconnected
69 70 71 72 73
)

// MeteredPeerEvent is an event emitted when peers connect or disconnect.
type MeteredPeerEvent struct {
	Type    MeteredPeerEventType // Type of peer event
74
	Addr    string               // TCP address of the peer
75
	Elapsed time.Duration        // Time elapsed between the connection and the handshake/disconnection
76
	Peer    *Peer                // Connected remote node instance
77 78 79 80 81 82 83 84 85 86
	Ingress uint64               // Ingress count at the moment of the event
	Egress  uint64               // Egress count at the moment of the event
}

// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
// if metrics collection is enabled.
func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
	return meteredPeerFeed.Subscribe(ch)
}

87
// meteredConn is a wrapper around a net.Conn that meters both the
88 89
// inbound and outbound network traffic.
type meteredConn struct {
90
	net.Conn // Network connection to wrap with metering
91

92 93 94
	connected time.Time    // Connection time of the peer
	addr      *net.TCPAddr // TCP address of the peer
	peer      *Peer        // Peer instance
95 96 97 98 99 100 101 102 103

	// trafficMetered denotes if the peer is registered in the traffic registries.
	// Its value is true if the metered peer count doesn't reach the limit in the
	// moment of the peer's connection.
	trafficMetered bool
	ingressMeter   metrics.Meter // Meter for the read bytes of the peer
	egressMeter    metrics.Meter // Meter for the written bytes of the peer

	lock sync.RWMutex // Lock protecting the metered connection's internals
104 105
}

106 107 108 109
// newMeteredConn creates a new metered connection, bumps the ingress or egress
// connection meter and also increases the metered peer count. If the metrics
// system is disabled or the IP address is unspecified, this function returns
// the original object.
110
func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
111 112 113 114
	// Short circuit if metrics are disabled
	if !metrics.Enabled {
		return conn
	}
115 116
	if addr == nil || addr.IP.IsUnspecified() {
		log.Warn("Peer address is unspecified")
117 118 119
		return conn
	}
	// Bump the connection counters and wrap the connection
120 121 122 123 124
	if ingress {
		ingressConnectMeter.Mark(1)
	} else {
		egressConnectMeter.Mark(1)
	}
125
	activePeerGauge.Inc(1)
126

127 128
	return &meteredConn{
		Conn:      conn,
129
		addr:      addr,
130 131
		connected: time.Now(),
	}
132 133
}

134 135
// Read delegates a network read to the underlying connection, bumping the common
// and the peer ingress traffic meters along the way.
136
func (c *meteredConn) Read(b []byte) (n int, err error) {
137
	n, err = c.Conn.Read(b)
138
	ingressTrafficMeter.Mark(int64(n))
139 140 141 142 143 144
	c.lock.RLock()
	if c.trafficMetered {
		c.ingressMeter.Mark(int64(n))
	}
	c.lock.RUnlock()
	return n, err
145 146
}

147 148
// Write delegates a network write to the underlying connection, bumping the common
// and the peer egress traffic meters along the way.
149
func (c *meteredConn) Write(b []byte) (n int, err error) {
150
	n, err = c.Conn.Write(b)
151
	egressTrafficMeter.Mark(int64(n))
152 153 154 155 156 157 158 159
	c.lock.RLock()
	if c.trafficMetered {
		c.egressMeter.Mark(int64(n))
	}
	c.lock.RUnlock()
	return n, err
}

160 161
// handshakeDone is called after the connection passes the handshake.
func (c *meteredConn) handshakeDone(peer *Peer) {
162 163 164 165
	if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
		// Don't register the peer in the traffic registries.
		atomic.AddInt32(&meteredPeerCount, -1)
		c.lock.Lock()
166
		c.peer, c.trafficMetered = peer, false
167 168 169
		c.lock.Unlock()
		log.Warn("Metered peer count reached the limit")
	} else {
170
		enode := peer.Node().String()
171
		c.lock.Lock()
172 173 174
		c.peer, c.trafficMetered = peer, true
		c.ingressMeter = metrics.NewRegisteredMeter(enode, PeerIngressRegistry)
		c.egressMeter = metrics.NewRegisteredMeter(enode, PeerEgressRegistry)
175 176 177
		c.lock.Unlock()
	}
	meteredPeerFeed.Send(MeteredPeerEvent{
178 179 180
		Type:    PeerHandshakeSucceeded,
		Addr:    c.addr.String(),
		Peer:    peer,
181 182 183 184 185 186 187 188 189
		Elapsed: time.Since(c.connected),
	})
}

// Close delegates a close operation to the underlying connection, unregisters
// the peer from the traffic registries and emits close event.
func (c *meteredConn) Close() error {
	err := c.Conn.Close()
	c.lock.RLock()
190 191
	if c.peer == nil {
		// If the peer disconnects before/during the handshake.
192 193 194
		c.lock.RUnlock()
		meteredPeerFeed.Send(MeteredPeerEvent{
			Type:    PeerHandshakeFailed,
195
			Addr:    c.addr.String(),
196 197
			Elapsed: time.Since(c.connected),
		})
198
		activePeerGauge.Dec(1)
199 200
		return err
	}
201
	peer := c.peer
202 203 204 205 206
	if !c.trafficMetered {
		// If the peer isn't registered in the traffic registries.
		c.lock.RUnlock()
		meteredPeerFeed.Send(MeteredPeerEvent{
			Type: PeerDisconnected,
207 208
			Addr: c.addr.String(),
			Peer: peer,
209
		})
210
		activePeerGauge.Dec(1)
211 212
		return err
	}
213
	ingress, egress, enode := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()), c.peer.Node().String()
214 215 216 217 218 219
	c.lock.RUnlock()

	// Decrement the metered peer count
	atomic.AddInt32(&meteredPeerCount, -1)

	// Unregister the peer from the traffic registries
220 221
	PeerIngressRegistry.Unregister(enode)
	PeerEgressRegistry.Unregister(enode)
222 223 224

	meteredPeerFeed.Send(MeteredPeerEvent{
		Type:    PeerDisconnected,
225 226
		Addr:    c.addr.String(),
		Peer:    peer,
227 228 229
		Ingress: ingress,
		Egress:  egress,
	})
230
	activePeerGauge.Dec(1)
231
	return err
232
}