Commit 59a31983 authored by gary rong's avatar gary rong Committed by Péter Szilágyi

les: remove half-finished priority pool APIs (#19780)

* les: remove half-finish APIs

* les: remove half-finish APIs
parent 8d2cf028
This diff is collapsed.
This diff is collapsed.
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package csvlogger
import (
"fmt"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
)
// Logger is a metrics/events logger that writes logged values and events into a comma separated file
type Logger struct {
file *os.File
started mclock.AbsTime
channels []*Channel
period time.Duration
stopCh, stopped chan struct{}
storeCh chan string
eventHeader string
}
// NewLogger creates a new Logger
func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger {
if fileName == "" {
return nil
}
f, err := os.Create(fileName)
if err != nil {
log.Error("Error creating log file", "name", fileName, "error", err)
return nil
}
return &Logger{
file: f,
period: updatePeriod,
stopCh: make(chan struct{}),
storeCh: make(chan string, 1),
eventHeader: eventHeader,
}
}
// NewChannel creates a new value logger channel that writes values in a single
// column. If the relative change of the value is bigger than the given threshold
// then a new line is added immediately (threshold can also be 0).
func (l *Logger) NewChannel(name string, threshold float64) *Channel {
if l == nil {
return nil
}
c := &Channel{
logger: l,
name: name,
threshold: threshold,
}
l.channels = append(l.channels, c)
return c
}
// NewMinMaxChannel creates a new value logger channel that writes the minimum and
// maximum of the tracked value in two columns. It never triggers adding a new line.
// If zeroDefault is true then 0 is written to both min and max columns if no update
// was given during the last period. If it is false then the last update will appear
// in both columns.
func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel {
if l == nil {
return nil
}
c := &Channel{
logger: l,
name: name,
minmax: true,
mmZeroDefault: zeroDefault,
}
l.channels = append(l.channels, c)
return c
}
func (l *Logger) store(event string) {
s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000)
for _, ch := range l.channels {
s += ", " + ch.store()
}
if event != "" {
s += ", " + event
}
l.file.WriteString(s + "\n")
}
// Start writes the header line and starts the logger
func (l *Logger) Start() {
if l == nil {
return
}
l.started = mclock.Now()
s := "Time"
for _, ch := range l.channels {
s += ", " + ch.header()
}
if l.eventHeader != "" {
s += ", " + l.eventHeader
}
l.file.WriteString(s + "\n")
go func() {
timer := time.NewTimer(l.period)
for {
select {
case <-timer.C:
l.store("")
timer.Reset(l.period)
case event := <-l.storeCh:
l.store(event)
if !timer.Stop() {
<-timer.C
}
timer.Reset(l.period)
case <-l.stopCh:
close(l.stopped)
return
}
}
}()
}
// Stop stops the logger and closes the file
func (l *Logger) Stop() {
if l == nil {
return
}
l.stopped = make(chan struct{})
close(l.stopCh)
<-l.stopped
l.file.Close()
}
// Event immediately adds a new line and adds the given event string in the last column
func (l *Logger) Event(event string) {
if l == nil {
return
}
select {
case l.storeCh <- event:
case <-l.stopCh:
}
}
// Channel represents a logger channel tracking a single value
type Channel struct {
logger *Logger
lock sync.Mutex
name string
threshold, storeMin, storeMax, lastValue, min, max float64
minmax, mmSet, mmZeroDefault bool
}
// Update updates the tracked value
func (lc *Channel) Update(value float64) {
if lc == nil {
return
}
lc.lock.Lock()
defer lc.lock.Unlock()
lc.lastValue = value
if lc.minmax {
if value > lc.max || !lc.mmSet {
lc.max = value
}
if value < lc.min || !lc.mmSet {
lc.min = value
}
lc.mmSet = true
} else {
if value < lc.storeMin || value > lc.storeMax {
select {
case lc.logger.storeCh <- "":
default:
}
}
}
}
func (lc *Channel) store() (s string) {
lc.lock.Lock()
defer lc.lock.Unlock()
if lc.minmax {
s = fmt.Sprintf("%g, %g", lc.min, lc.max)
lc.mmSet = false
if lc.mmZeroDefault {
lc.min = 0
} else {
lc.min = lc.lastValue
}
lc.max = lc.min
} else {
s = fmt.Sprintf("%g", lc.lastValue)
lc.storeMin = lc.lastValue * (1 - lc.threshold)
lc.storeMax = lc.lastValue * (1 + lc.threshold)
if lc.lastValue < 0 {
lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin
}
}
return
}
func (lc *Channel) header() string {
if lc.minmax {
return lc.name + " (min), " + lc.name + " (max)"
}
return lc.name
}
......@@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -53,8 +52,7 @@ type freeClientPool struct {
connectedLimit, totalLimit int
freeClientCap uint64
logger *csvlogger.Logger
logTotalFreeConn *csvlogger.Channel
connectedCap uint64
addressMap map[string]*freeClientPoolEntry
connPool, disconnPool *prque.Prque
......@@ -69,18 +67,16 @@ const (
)
// newFreeClientPool creates a new free client pool
func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string), metricsLogger, eventLogger *csvlogger.Logger) *freeClientPool {
func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool {
pool := &freeClientPool{
db: db,
clock: clock,
addressMap: make(map[string]*freeClientPoolEntry),
connPool: prque.New(poolSetIndex),
disconnPool: prque.New(poolSetIndex),
freeClientCap: freeClientCap,
totalLimit: totalLimit,
logger: eventLogger,
logTotalFreeConn: metricsLogger.NewChannel("totalFreeConn", 0),
removePeer: removePeer,
db: db,
clock: clock,
addressMap: make(map[string]*freeClientPoolEntry),
connPool: prque.New(poolSetIndex),
disconnPool: prque.New(poolSetIndex),
freeClientCap: freeClientCap,
totalLimit: totalLimit,
removePeer: removePeer,
}
pool.loadFromDb()
return pool
......@@ -126,10 +122,7 @@ func (f *freeClientPool) connect(address, id string) bool {
if f.closed {
return false
}
f.logger.Event("freeClientPool: connecting from " + address + ", " + id)
if f.connectedLimit == 0 {
f.logger.Event("freeClientPool: rejected, " + id)
log.Debug("Client rejected", "address", address)
return false
}
......@@ -141,7 +134,6 @@ func (f *freeClientPool) connect(address, id string) bool {
f.addressMap[address] = e
} else {
if e.connected {
f.logger.Event("freeClientPool: already connected, " + id)
log.Debug("Client already connected", "address", address)
return false
}
......@@ -154,12 +146,13 @@ func (f *freeClientPool) connect(address, id string) bool {
if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
// kick it out and accept the new client
f.dropClient(i, now)
f.logger.Event("freeClientPool: kicked out, " + i.id)
clientKickedMeter.Mark(1)
f.connectedCap -= f.freeClientCap
} else {
// keep the old client and reject the new one
f.connPool.Push(i, i.linUsage)
f.logger.Event("freeClientPool: rejected, " + id)
log.Debug("Client rejected", "address", address)
clientRejectedMeter.Mark(1)
return false
}
}
......@@ -167,11 +160,12 @@ func (f *freeClientPool) connect(address, id string) bool {
e.connected = true
e.id = id
f.connPool.Push(e, e.linUsage)
f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
f.disconnPool.Pop()
}
f.logger.Event("freeClientPool: accepted, " + id)
f.connectedCap += f.freeClientCap
totalConnectedGauge.Update(int64(f.connectedCap))
clientConnectedMeter.Mark(1)
log.Debug("Client accepted", "address", address)
return true
}
......@@ -203,13 +197,12 @@ func (f *freeClientPool) disconnect(address string) {
log.Debug("Client already disconnected", "address", address)
return
}
f.connPool.Remove(e.index)
f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
f.calcLogUsage(e, now)
e.connected = false
f.disconnPool.Push(e, -e.logUsage)
f.logger.Event("freeClientPool: disconnected, " + e.id)
f.connectedCap -= f.freeClientCap
totalConnectedGauge.Update(int64(f.connectedCap))
log.Debug("Client disconnected", "address", address)
}
......@@ -227,15 +220,15 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) {
for f.connPool.Size() > f.connectedLimit {
i := f.connPool.PopItem().(*freeClientPoolEntry)
f.dropClient(i, now)
f.logger.Event("freeClientPool: setLimits kicked out, " + i.id)
f.connectedCap -= f.freeClientCap
}
totalConnectedGauge.Update(int64(f.connectedCap))
}
// dropClient disconnects a client and also moves it from the connected to the
// disconnected pool
func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) {
f.connPool.Remove(i.index)
f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap))
f.calcLogUsage(i, now)
i.connected = false
f.disconnPool.Push(i, -i.logUsage)
......
......@@ -61,7 +61,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
}
disconnCh <- i
}
pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil)
pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
)
pool.setLimits(connLimit, uint64(connLimit))
......@@ -130,7 +130,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
// close and restart pool
pool.stop()
pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil)
pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
pool.setLimits(connLimit, uint64(connLimit))
// try connecting all known peers (connLimit should be filled up)
......
......@@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
......@@ -124,7 +123,6 @@ type ProtocolManager struct {
wg *sync.WaitGroup
eventMux *event.TypeMux
logger *csvlogger.Logger
// Callbacks
synced func() bool
......@@ -262,11 +260,12 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
// In server mode we try to check into the client pool after handshake
if pm.client && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
pm.logger.Event("Rejected (too many peers), " + p.id)
clientRejectedMeter.Mark(1)
return p2p.DiscTooManyPeers
}
// Reject light clients if server is not synced.
if !pm.client && !pm.synced() {
clientRejectedMeter.Mark(1)
return p2p.DiscRequested
}
p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
......@@ -281,7 +280,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
)
if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
pm.logger.Event("Handshake error: " + err.Error() + ", " + p.id)
clientErrorMeter.Mark(1)
return err
}
if p.fcClient != nil {
......@@ -294,14 +293,14 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
clientErrorMeter.Mark(1)
p.Log().Error("Light Ethereum peer registration failed", "err", err)
pm.logger.Event("Peer registration error: " + err.Error() + ", " + p.id)
return err
}
pm.logger.Event("Connection established, " + p.id)
connectedAt := time.Now()
defer func() {
pm.logger.Event("Closed connection, " + p.id)
pm.removePeer(p.id)
connectionTimer.UpdateSince(connectedAt)
}()
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
......@@ -317,11 +316,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
pm.serverPool.registered(p.poolEntry)
}
}
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
pm.logger.Event("Message handling error: " + err.Error() + ", " + p.id)
p.Log().Debug("Light Ethereum message handling failed", "err", err)
if p.fcServer != nil {
p.fcServer.DumpLogs()
......
......@@ -231,7 +231,7 @@ func newTestProtocolManager(lightSync bool, blocks int, odr *LesOdr, indexers []
if !lightSync {
srv := &LesServer{lesCommons: lesCommons{protocolManager: pm, chainDb: db}}
pm.server = srv
pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1, nil)
pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1)
pm.servingQueue.setThreads(4)
srv.defParams = flowcontrol.ServerParams{
......
......@@ -22,46 +22,31 @@ import (
)
var (
/* propTxnInPacketsMeter = metrics.NewMeter("eth/prop/txns/in/packets")
propTxnInTrafficMeter = metrics.NewMeter("eth/prop/txns/in/traffic")
propTxnOutPacketsMeter = metrics.NewMeter("eth/prop/txns/out/packets")
propTxnOutTrafficMeter = metrics.NewMeter("eth/prop/txns/out/traffic")
propHashInPacketsMeter = metrics.NewMeter("eth/prop/hashes/in/packets")
propHashInTrafficMeter = metrics.NewMeter("eth/prop/hashes/in/traffic")
propHashOutPacketsMeter = metrics.NewMeter("eth/prop/hashes/out/packets")
propHashOutTrafficMeter = metrics.NewMeter("eth/prop/hashes/out/traffic")
propBlockInPacketsMeter = metrics.NewMeter("eth/prop/blocks/in/packets")
propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic")
propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets")
propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic")
reqHashInPacketsMeter = metrics.NewMeter("eth/req/hashes/in/packets")
reqHashInTrafficMeter = metrics.NewMeter("eth/req/hashes/in/traffic")
reqHashOutPacketsMeter = metrics.NewMeter("eth/req/hashes/out/packets")
reqHashOutTrafficMeter = metrics.NewMeter("eth/req/hashes/out/traffic")
reqBlockInPacketsMeter = metrics.NewMeter("eth/req/blocks/in/packets")
reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic")
reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets")
reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic")
reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/headers/in/packets")
reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/headers/in/traffic")
reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/headers/out/packets")
reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/headers/out/traffic")
reqBodyInPacketsMeter = metrics.NewMeter("eth/req/bodies/in/packets")
reqBodyInTrafficMeter = metrics.NewMeter("eth/req/bodies/in/traffic")
reqBodyOutPacketsMeter = metrics.NewMeter("eth/req/bodies/out/packets")
reqBodyOutTrafficMeter = metrics.NewMeter("eth/req/bodies/out/traffic")
reqStateInPacketsMeter = metrics.NewMeter("eth/req/states/in/packets")
reqStateInTrafficMeter = metrics.NewMeter("eth/req/states/in/traffic")
reqStateOutPacketsMeter = metrics.NewMeter("eth/req/states/out/packets")
reqStateOutTrafficMeter = metrics.NewMeter("eth/req/states/out/traffic")
reqReceiptInPacketsMeter = metrics.NewMeter("eth/req/receipts/in/packets")
reqReceiptInTrafficMeter = metrics.NewMeter("eth/req/receipts/in/traffic")
reqReceiptOutPacketsMeter = metrics.NewMeter("eth/req/receipts/out/packets")
reqReceiptOutTrafficMeter = metrics.NewMeter("eth/req/receipts/out/traffic")*/
miscInPacketsMeter = metrics.NewRegisteredMeter("les/misc/in/packets", nil)
miscInTrafficMeter = metrics.NewRegisteredMeter("les/misc/in/traffic", nil)
miscOutPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets", nil)
miscOutTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic", nil)
connectionTimer = metrics.NewRegisteredTimer("les/connectionTime", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/requestServed", nil)
requestServedMeter = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015))
recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil)
clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil)
clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil)
clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil)
// clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil)
clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
)
// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
......
......@@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/csvlogger"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
......@@ -40,15 +39,6 @@ import (
const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio
const (
logFileName = "" // csv log file name (disabled if empty)
logClientPoolMetrics = true // log client pool metrics
logClientPoolEvents = false // detailed client pool event logging
logRequestServing = true // log request serving metrics and events
logBlockProcEvents = true // log block processing events
logProtocolHandler = true // log protocol handler events
)
type LesServer struct {
lesCommons
......@@ -62,26 +52,15 @@ type LesServer struct {
privateKey *ecdsa.PrivateKey
quitSync chan struct{}
onlyAnnounce bool
csvLogger *csvlogger.Logger
logTotalCap *csvlogger.Channel
thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode
maxPeers int
minCapacity, freeClientCap uint64
freeClientPool *freeClientPool
priorityClientPool *priorityClientPool
}
func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
var csvLogger *csvlogger.Logger
if logFileName != "" {
csvLogger = csvlogger.NewLogger(logFileName, time.Second*10, "event, peerId")
}
requestLogger := csvLogger
if !logRequestServing {
requestLogger = nil
}
lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
for i, pv := range AdvertiseProtocolVersions {
lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv)
......@@ -99,10 +78,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
quitSync: quitSync,
lesTopics: lesTopics,
onlyAnnounce: config.OnlyAnnounce,
csvLogger: csvLogger,
logTotalCap: requestLogger.NewChannel("totalCapacity", 0.01),
}
srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config, requestLogger)
srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config)
logger := log.New()
srv.thcNormal = config.LightServ * 4 / 100
......@@ -131,10 +108,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
return nil, err
}
srv.protocolManager = pm
if logProtocolHandler {
pm.logger = csvLogger
}
pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100, requestLogger)
pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100)
pm.server = srv
return srv, nil
......@@ -142,12 +116,6 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
func (s *LesServer) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "les",
Version: "1.0",
Service: NewPrivateLightServerAPI(s),
Public: false,
},
{
Namespace: "les",
Version: "1.0",
......@@ -163,11 +131,10 @@ func (s *LesServer) APIs() []rpc.API {
func (s *LesServer) startEventLoop() {
s.protocolManager.wg.Add(1)
blockProcLogger := s.csvLogger
if !logBlockProcEvents {
blockProcLogger = nil
}
var processing, procLast bool
var (
processing, procLast bool
procStarted time.Time
)
blockProcFeed := make(chan bool, 100)
s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed)
totalRechargeCh := make(chan uint64, 100)
......@@ -176,13 +143,13 @@ func (s *LesServer) startEventLoop() {
updateRecharge := func() {
if processing {
if !procLast {
blockProcLogger.Event("block processing started")
procStarted = time.Now()
}
s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing)
s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}})
} else {
if procLast {
blockProcLogger.Event("block processing finished")
blockProcessingTimer.UpdateSince(procStarted)
}
s.protocolManager.servingQueue.setThreads(s.thcNormal)
s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}})
......@@ -191,7 +158,7 @@ func (s *LesServer) startEventLoop() {
}
updateRecharge()
totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
var maxFreePeers uint64
go func() {
......@@ -202,13 +169,13 @@ func (s *LesServer) startEventLoop() {
case totalRecharge = <-totalRechargeCh:
updateRecharge()
case totalCapacity = <-totalCapacityCh:
s.logTotalCap.Update(float64(totalCapacity))
totalCapacityGauge.Update(int64(totalCapacity))
newFreePeers := totalCapacity / s.freeClientCap
if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) {
log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers)
}
maxFreePeers = newFreePeers
s.priorityClientPool.setLimits(s.maxPeers, totalCapacity)
s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
case <-s.protocolManager.quitSync:
s.protocolManager.wg.Done()
return
......@@ -243,19 +210,9 @@ func (s *LesServer) Start(srvr *p2p.Server) {
maxCapacity = totalRecharge
}
s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2)
poolMetricsLogger := s.csvLogger
if !logClientPoolMetrics {
poolMetricsLogger = nil
}
poolEventLogger := s.csvLogger
if !logClientPoolEvents {
poolEventLogger = nil
}
s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }, poolMetricsLogger, poolEventLogger)
s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool, poolMetricsLogger, poolEventLogger)
s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) })
s.protocolManager.peers.notify(s.freeClientPool)
s.protocolManager.peers.notify(s.priorityClientPool)
s.csvLogger.Start()
s.startEventLoop()
s.protocolManager.Start(s.config.LightPeers)
if srvr.DiscV5 != nil {
......@@ -296,7 +253,6 @@ func (s *LesServer) Stop() {
s.freeClientPool.stop()
s.costTracker.stop()
s.protocolManager.Stop()
s.csvLogger.Stop()
}
// todo(rjl493456442) separate client and server implementation.
......
......@@ -17,14 +17,12 @@
package les
import (
"fmt"
"sort"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/les/csvlogger"
)
// servingQueue allows running tasks in a limited number of threads and puts the
......@@ -44,10 +42,6 @@ type servingQueue struct {
queue *prque.Prque // priority queue for waiting or suspended tasks
best *servingTask // the highest priority task (not included in the queue)
suspendBias int64 // priority bias against suspending an already running task
logger *csvlogger.Logger
logRecentTime *csvlogger.Channel
logQueuedTime *csvlogger.Channel
}
// servingTask represents a request serving task. Tasks can be implemented to
......@@ -127,7 +121,7 @@ func (t *servingTask) waitOrStop() bool {
}
// newServingQueue returns a new servingQueue
func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue {
func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
sq := &servingQueue{
queue: prque.New(nil),
suspendBias: suspendBias,
......@@ -140,9 +134,6 @@ func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Lo
burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000),
burstDecRate: utilTarget,
lastUpdate: mclock.Now(),
logger: logger,
logRecentTime: logger.NewMinMaxChannel("recentTime", false),
logQueuedTime: logger.NewMinMaxChannel("queuedTime", false),
}
sq.wg.Add(2)
go sq.queueLoop()
......@@ -246,16 +237,13 @@ func (sq *servingQueue) freezePeers() {
}
sort.Sort(peerList)
drop := true
sq.logger.Event("freezing peers")
for _, tasks := range peerList {
if drop {
tasks.peer.freezeClient()
tasks.peer.fcClient.Freeze()
sq.queuedTime -= tasks.sumTime
if sq.logQueuedTime != nil {
sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
}
sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id))
sqQueuedGauge.Update(int64(sq.queuedTime))
clientFreezeMeter.Mark(1)
drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit
for _, task := range tasks.list {
task.tokenCh <- nil
......@@ -299,10 +287,8 @@ func (sq *servingQueue) addTask(task *servingTask) {
}
sq.updateRecentTime()
sq.queuedTime += task.expTime
if sq.logQueuedTime != nil {
sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
}
sqServedGauge.Update(int64(sq.recentTime))
sqQueuedGauge.Update(int64(sq.queuedTime))
if sq.recentTime+sq.queuedTime > sq.burstLimit {
sq.freezePeers()
}
......@@ -322,10 +308,8 @@ func (sq *servingQueue) queueLoop() {
sq.updateRecentTime()
sq.queuedTime -= expTime
sq.recentTime += expTime
if sq.logQueuedTime != nil {
sq.logRecentTime.Update(float64(sq.recentTime) / 1000)
sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000)
}
sqServedGauge.Update(int64(sq.recentTime))
sqQueuedGauge.Update(int64(sq.queuedTime))
if sq.queue.Size() == 0 {
sq.best = nil
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment