Commit c2003ed6 authored by Felföldi Zsolt's avatar Felföldi Zsolt Committed by Felix Lange

les, les/flowcontrol: improved request serving and flow control (#18230)

This change

- implements concurrent LES request serving even for a single peer.
- replaces the request cost estimation method with a cost table based on
  benchmarks which gives much more consistent results. Until now the
  allowed number of light peers was just a guess which probably contributed
  a lot to the fluctuating quality of available service. Everything related
  to request cost is implemented in a single object, the 'cost tracker'. It
  uses a fixed cost table with a global 'correction factor'. Benchmark code
  is included and can be run at any time to adapt costs to low-level
  implementation changes.
- reimplements flowcontrol.ClientManager in a cleaner and more efficient
  way, with added capabilities: There is now control over bandwidth, which
  allows using the flow control parameters for client prioritization.
  Target utilization over 100 percent is now supported to model concurrent
  request processing. Total serving bandwidth is reduced during block
  processing to prevent database contention.
- implements an RPC API for the LES servers allowing server operators to
  assign priority bandwidth to certain clients and change prioritized
  status even while the client is connected. The new API is meant for
  cases where server operators charge for LES using an off-protocol mechanism.
- adds a unit test for the new client manager.
- adds an end-to-end test using the network simulator that tests bandwidth
  control functions through the new API.
parent c2b33a11
...@@ -93,6 +93,8 @@ var ( ...@@ -93,6 +93,8 @@ var (
utils.ExitWhenSyncedFlag, utils.ExitWhenSyncedFlag,
utils.GCModeFlag, utils.GCModeFlag,
utils.LightServFlag, utils.LightServFlag,
utils.LightBandwidthInFlag,
utils.LightBandwidthOutFlag,
utils.LightPeersFlag, utils.LightPeersFlag,
utils.LightKDFFlag, utils.LightKDFFlag,
utils.WhitelistFlag, utils.WhitelistFlag,
......
...@@ -81,6 +81,8 @@ var AppHelpFlagGroups = []flagGroup{ ...@@ -81,6 +81,8 @@ var AppHelpFlagGroups = []flagGroup{
utils.EthStatsURLFlag, utils.EthStatsURLFlag,
utils.IdentityFlag, utils.IdentityFlag,
utils.LightServFlag, utils.LightServFlag,
utils.LightBandwidthInFlag,
utils.LightBandwidthOutFlag,
utils.LightPeersFlag, utils.LightPeersFlag,
utils.LightKDFFlag, utils.LightKDFFlag,
utils.WhitelistFlag, utils.WhitelistFlag,
......
...@@ -199,9 +199,19 @@ var ( ...@@ -199,9 +199,19 @@ var (
} }
LightServFlag = cli.IntFlag{ LightServFlag = cli.IntFlag{
Name: "lightserv", Name: "lightserv",
Usage: "Maximum percentage of time allowed for serving LES requests (0-90)", Usage: "Maximum percentage of time allowed for serving LES requests (multi-threaded processing allows values over 100)",
Value: 0, Value: 0,
} }
LightBandwidthInFlag = cli.IntFlag{
Name: "lightbwin",
Usage: "Incoming bandwidth limit for light server (1000 bytes/sec, 0 = unlimited)",
Value: 1000,
}
LightBandwidthOutFlag = cli.IntFlag{
Name: "lightbwout",
Usage: "Outgoing bandwidth limit for light server (1000 bytes/sec, 0 = unlimited)",
Value: 5000,
}
LightPeersFlag = cli.IntFlag{ LightPeersFlag = cli.IntFlag{
Name: "lightpeers", Name: "lightpeers",
Usage: "Maximum number of LES client peers", Usage: "Maximum number of LES client peers",
...@@ -1305,6 +1315,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { ...@@ -1305,6 +1315,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(LightServFlag.Name) { if ctx.GlobalIsSet(LightServFlag.Name) {
cfg.LightServ = ctx.GlobalInt(LightServFlag.Name) cfg.LightServ = ctx.GlobalInt(LightServFlag.Name)
} }
cfg.LightBandwidthIn = ctx.GlobalInt(LightBandwidthInFlag.Name)
cfg.LightBandwidthOut = ctx.GlobalInt(LightBandwidthOutFlag.Name)
if ctx.GlobalIsSet(LightPeersFlag.Name) { if ctx.GlobalIsSet(LightPeersFlag.Name) {
cfg.LightPeers = ctx.GlobalInt(LightPeersFlag.Name) cfg.LightPeers = ctx.GlobalInt(LightPeersFlag.Name)
} }
......
...@@ -111,6 +111,7 @@ type BlockChain struct { ...@@ -111,6 +111,7 @@ type BlockChain struct {
chainSideFeed event.Feed chainSideFeed event.Feed
chainHeadFeed event.Feed chainHeadFeed event.Feed
logsFeed event.Feed logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope scope event.SubscriptionScope
genesisBlock *types.Block genesisBlock *types.Block
...@@ -1090,6 +1091,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { ...@@ -1090,6 +1091,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if len(chain) == 0 { if len(chain) == 0 {
return 0, nil return 0, nil
} }
bc.blockProcFeed.Send(true)
defer bc.blockProcFeed.Send(false)
// Remove already known canon-blocks // Remove already known canon-blocks
var ( var (
block, prev *types.Block block, prev *types.Block
...@@ -1725,3 +1730,9 @@ func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Su ...@@ -1725,3 +1730,9 @@ func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Su
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch)) return bc.scope.Track(bc.logsFeed.Subscribe(ch))
} }
// SubscribeBlockProcessingEvent registers a subscription of bool where true means
// block processing has started while false means it has stopped.
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}
...@@ -54,6 +54,7 @@ import ( ...@@ -54,6 +54,7 @@ import (
type LesServer interface { type LesServer interface {
Start(srvr *p2p.Server) Start(srvr *p2p.Server)
Stop() Stop()
APIs() []rpc.API
Protocols() []p2p.Protocol Protocols() []p2p.Protocol
SetBloomBitsIndexer(bbIndexer *core.ChainIndexer) SetBloomBitsIndexer(bbIndexer *core.ChainIndexer)
} }
...@@ -267,6 +268,10 @@ func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainCo ...@@ -267,6 +268,10 @@ func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainCo
func (s *Ethereum) APIs() []rpc.API { func (s *Ethereum) APIs() []rpc.API {
apis := ethapi.GetAPIs(s.APIBackend) apis := ethapi.GetAPIs(s.APIBackend)
// Append any APIs exposed explicitly by the les server
if s.lesServer != nil {
apis = append(apis, s.lesServer.APIs()...)
}
// Append any APIs exposed explicitly by the consensus engine // Append any APIs exposed explicitly by the consensus engine
apis = append(apis, s.engine.APIs(s.BlockChain())...) apis = append(apis, s.engine.APIs(s.BlockChain())...)
......
...@@ -98,9 +98,11 @@ type Config struct { ...@@ -98,9 +98,11 @@ type Config struct {
Whitelist map[uint64]common.Hash `toml:"-"` Whitelist map[uint64]common.Hash `toml:"-"`
// Light client options // Light client options
LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
LightPeers int `toml:",omitempty"` // Maximum number of LES client peers LightBandwidthIn int `toml:",omitempty"` // Incoming bandwidth limit for light servers
OnlyAnnounce bool // Maximum number of LES client peers LightBandwidthOut int `toml:",omitempty"` // Outgoing bandwidth limit for light servers
LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
OnlyAnnounce bool // Maximum number of LES client peers
// Ultra Light client options // Ultra Light client options
ULC *ULCConfig `toml:",omitempty"` ULC *ULCConfig `toml:",omitempty"`
......
...@@ -24,6 +24,8 @@ func (c Config) MarshalTOML() (interface{}, error) { ...@@ -24,6 +24,8 @@ func (c Config) MarshalTOML() (interface{}, error) {
SyncMode downloader.SyncMode SyncMode downloader.SyncMode
NoPruning bool NoPruning bool
LightServ int `toml:",omitempty"` LightServ int `toml:",omitempty"`
LightBandwidthIn int `toml:",omitempty"`
LightBandwidthOut int `toml:",omitempty"`
LightPeers int `toml:",omitempty"` LightPeers int `toml:",omitempty"`
OnlyAnnounce bool OnlyAnnounce bool
ULC *ULCConfig `toml:",omitempty"` ULC *ULCConfig `toml:",omitempty"`
...@@ -55,6 +57,8 @@ func (c Config) MarshalTOML() (interface{}, error) { ...@@ -55,6 +57,8 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.SyncMode = c.SyncMode enc.SyncMode = c.SyncMode
enc.NoPruning = c.NoPruning enc.NoPruning = c.NoPruning
enc.LightServ = c.LightServ enc.LightServ = c.LightServ
enc.LightBandwidthIn = c.LightBandwidthIn
enc.LightBandwidthOut = c.LightBandwidthOut
enc.LightPeers = c.LightPeers enc.LightPeers = c.LightPeers
enc.OnlyAnnounce = c.OnlyAnnounce enc.OnlyAnnounce = c.OnlyAnnounce
enc.ULC = c.ULC enc.ULC = c.ULC
...@@ -91,6 +95,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { ...@@ -91,6 +95,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
SyncMode *downloader.SyncMode SyncMode *downloader.SyncMode
NoPruning *bool NoPruning *bool
LightServ *int `toml:",omitempty"` LightServ *int `toml:",omitempty"`
LightBandwidthIn *int `toml:",omitempty"`
LightBandwidthOut *int `toml:",omitempty"`
LightPeers *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"`
OnlyAnnounce *bool OnlyAnnounce *bool
ULC *ULCConfig `toml:",omitempty"` ULC *ULCConfig `toml:",omitempty"`
...@@ -135,6 +141,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { ...@@ -135,6 +141,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.LightServ != nil { if dec.LightServ != nil {
c.LightServ = *dec.LightServ c.LightServ = *dec.LightServ
} }
if dec.LightBandwidthIn != nil {
c.LightBandwidthIn = *dec.LightBandwidthIn
}
if dec.LightBandwidthOut != nil {
c.LightBandwidthOut = *dec.LightBandwidthOut
}
if dec.LightPeers != nil { if dec.LightPeers != nil {
c.LightPeers = *dec.LightPeers c.LightPeers = *dec.LightPeers
} }
......
This diff is collapsed.
This diff is collapsed.
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/bloombits"
...@@ -100,7 +101,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { ...@@ -100,7 +101,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
chainConfig: chainConfig, chainConfig: chainConfig,
eventMux: ctx.EventMux, eventMux: ctx.EventMux,
peers: peers, peers: peers,
reqDist: newRequestDistributor(peers, quitSync), reqDist: newRequestDistributor(peers, quitSync, &mclock.System{}),
accountManager: ctx.AccountManager, accountManager: ctx.AccountManager,
engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, false, chainDb), engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, false, chainDb),
shutdownChan: make(chan bool), shutdownChan: make(chan bool),
......
This diff is collapsed.
This diff is collapsed.
...@@ -14,20 +14,21 @@ ...@@ -14,20 +14,21 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les package les
import ( import (
"container/list" "container/list"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common/mclock"
) )
// requestDistributor implements a mechanism that distributes requests to // requestDistributor implements a mechanism that distributes requests to
// suitable peers, obeying flow control rules and prioritizing them in creation // suitable peers, obeying flow control rules and prioritizing them in creation
// order (even when a resend is necessary). // order (even when a resend is necessary).
type requestDistributor struct { type requestDistributor struct {
clock mclock.Clock
reqQueue *list.List reqQueue *list.List
lastReqOrder uint64 lastReqOrder uint64
peers map[distPeer]struct{} peers map[distPeer]struct{}
...@@ -67,8 +68,9 @@ type distReq struct { ...@@ -67,8 +68,9 @@ type distReq struct {
} }
// newRequestDistributor creates a new request distributor // newRequestDistributor creates a new request distributor
func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor { func newRequestDistributor(peers *peerSet, stopChn chan struct{}, clock mclock.Clock) *requestDistributor {
d := &requestDistributor{ d := &requestDistributor{
clock: clock,
reqQueue: list.New(), reqQueue: list.New(),
loopChn: make(chan struct{}, 2), loopChn: make(chan struct{}, 2),
stopChn: stopChn, stopChn: stopChn,
...@@ -148,7 +150,7 @@ func (d *requestDistributor) loop() { ...@@ -148,7 +150,7 @@ func (d *requestDistributor) loop() {
wait = distMaxWait wait = distMaxWait
} }
go func() { go func() {
time.Sleep(wait) d.clock.Sleep(wait)
d.loopChn <- struct{}{} d.loopChn <- struct{}{}
}() }()
break loop break loop
......
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les package les
import ( import (
...@@ -23,6 +21,8 @@ import ( ...@@ -23,6 +21,8 @@ import (
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/common/mclock"
) )
type testDistReq struct { type testDistReq struct {
...@@ -121,7 +121,7 @@ func testRequestDistributor(t *testing.T, resend bool) { ...@@ -121,7 +121,7 @@ func testRequestDistributor(t *testing.T, resend bool) {
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
dist := newRequestDistributor(nil, stop) dist := newRequestDistributor(nil, stop, &mclock.System{})
var peers [testDistPeerCount]*testDistPeer var peers [testDistPeerCount]*testDistPeer
for i := range peers { for i := range peers {
peers[i] = &testDistPeer{} peers[i] = &testDistPeer{}
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package les implements the Light Ethereum Subprotocol.
package les package les
import ( import (
...@@ -559,7 +558,7 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes ...@@ -559,7 +558,7 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes
f.lock.Unlock() f.lock.Unlock()
cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
p.fcServer.QueueRequest(reqID, cost) p.fcServer.QueuedRequest(reqID, cost)
f.reqMu.Lock() f.reqMu.Lock()
f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
f.reqMu.Unlock() f.reqMu.Unlock()
......
This diff is collapsed.
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package flowcontrol
import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
// logger collects events in string format and discards events older than the
// "keep" parameter
type logger struct {
events map[uint64]logEvent
writePtr, delPtr uint64
keep time.Duration
}
// logEvent describes a single event
type logEvent struct {
time mclock.AbsTime
event string
}
// newLogger creates a new logger
func newLogger(keep time.Duration) *logger {
return &logger{
events: make(map[uint64]logEvent),
keep: keep,
}
}
// add adds a new event and discards old events if possible
func (l *logger) add(now mclock.AbsTime, event string) {
keepAfter := now - mclock.AbsTime(l.keep)
for l.delPtr < l.writePtr && l.events[l.delPtr].time <= keepAfter {
delete(l.events, l.delPtr)
l.delPtr++
}
l.events[l.writePtr] = logEvent{now, event}
l.writePtr++
}
// dump prints all stored events
func (l *logger) dump(now mclock.AbsTime) {
for i := l.delPtr; i < l.writePtr; i++ {
e := l.events[i]
fmt.Println(time.Duration(e.time-now), e.event)
}
}
This diff is collapsed.
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package flowcontrol
import (
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
type testNode struct {
node *ClientNode
bufLimit, capacity uint64
waitUntil mclock.AbsTime
index, totalCost uint64
}
const (
testMaxCost = 1000000
testLength = 100000
)
// testConstantTotalCapacity simulates multiple request sender nodes and verifies
// whether the total amount of served requests matches the expected value based on
// the total capacity and the duration of the test.
// Some nodes are sending requests occasionally so that their buffer should regularly
// reach the maximum while other nodes (the "max capacity nodes") are sending at the
// maximum permitted rate. The max capacity nodes are changed multiple times during
// a single test.
func TestConstantTotalCapacity(t *testing.T) {
testConstantTotalCapacity(t, 10, 1, 0)
testConstantTotalCapacity(t, 10, 1, 1)
testConstantTotalCapacity(t, 30, 1, 0)
testConstantTotalCapacity(t, 30, 2, 3)
testConstantTotalCapacity(t, 100, 1, 0)
testConstantTotalCapacity(t, 100, 3, 5)
testConstantTotalCapacity(t, 100, 5, 10)
}
func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, randomSend int) {
clock := &mclock.Simulated{}
nodes := make([]*testNode, nodeCount)
var totalCapacity uint64
for i := range nodes {
nodes[i] = &testNode{capacity: uint64(50000 + rand.Intn(100000))}
totalCapacity += nodes[i].capacity
}
m := NewClientManager(PieceWiseLinear{{0, totalCapacity}}, clock)
for _, n := range nodes {
n.bufLimit = n.capacity * 6000 //uint64(2000+rand.Intn(10000))
n.node = NewClientNode(m, ServerParams{BufLimit: n.bufLimit, MinRecharge: n.capacity})
}
maxNodes := make([]int, maxCapacityNodes)
for i := range maxNodes {
// we don't care if some indexes are selected multiple times
// in that case we have fewer max nodes
maxNodes[i] = rand.Intn(nodeCount)
}
for i := 0; i < testLength; i++ {
now := clock.Now()
for _, idx := range maxNodes {
for nodes[idx].send(t, now) {
}
}
if rand.Intn(testLength) < maxCapacityNodes*3 {
maxNodes[rand.Intn(maxCapacityNodes)] = rand.Intn(nodeCount)
}
sendCount := randomSend
for sendCount > 0 {
if nodes[rand.Intn(nodeCount)].send(t, now) {
sendCount--
}
}
clock.Run(time.Millisecond)
}
var totalCost uint64
for _, n := range nodes {
totalCost += n.totalCost
}
ratio := float64(totalCost) / float64(totalCapacity) / testLength
if ratio < 0.98 || ratio > 1.02 {
t.Errorf("totalCost/totalCapacity/testLength ratio incorrect (expected: 1, got: %f)", ratio)
}
}
func (n *testNode) send(t *testing.T, now mclock.AbsTime) bool {
if now < n.waitUntil {
return false
}
n.index++
if ok, _, _ := n.node.AcceptRequest(0, n.index, testMaxCost); !ok {
t.Fatalf("Rejected request after expected waiting time has passed")
}
rcost := uint64(rand.Int63n(testMaxCost))
bv := n.node.RequestProcessed(0, n.index, testMaxCost, rcost)
if bv < testMaxCost {
n.waitUntil = now + mclock.AbsTime((testMaxCost-bv)*1001000/n.capacity)
}
//n.waitUntil = now + mclock.AbsTime(float64(testMaxCost)*1001000/float64(n.capacity)*(1-float64(bv)/float64(n.bufLimit)))
n.totalCost += rcost
return true
}
...@@ -14,12 +14,12 @@ ...@@ -14,12 +14,12 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package les implements the Light Ethereum Subprotocol.
package les package les
import ( import (
"io" "io"
"math" "math"
"net"
"sync" "sync"
"time" "time"
...@@ -44,12 +44,14 @@ import ( ...@@ -44,12 +44,14 @@ import (
// value for the client. Currently the LES protocol manager uses IP addresses // value for the client. Currently the LES protocol manager uses IP addresses
// (without port address) to identify clients. // (without port address) to identify clients.
type freeClientPool struct { type freeClientPool struct {
db ethdb.Database db ethdb.Database
lock sync.Mutex lock sync.Mutex
clock mclock.Clock clock mclock.Clock
closed bool closed bool
removePeer func(string)
connectedLimit, totalLimit int connectedLimit, totalLimit int
freeClientCap uint64
addressMap map[string]*freeClientPoolEntry addressMap map[string]*freeClientPoolEntry
connPool, disconnPool *prque.Prque connPool, disconnPool *prque.Prque
...@@ -64,15 +66,16 @@ const ( ...@@ -64,15 +66,16 @@ const (
) )
// newFreeClientPool creates a new free client pool // newFreeClientPool creates a new free client pool
func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool { func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool {
pool := &freeClientPool{ pool := &freeClientPool{
db: db, db: db,
clock: clock, clock: clock,
addressMap: make(map[string]*freeClientPoolEntry), addressMap: make(map[string]*freeClientPoolEntry),
connPool: prque.New(poolSetIndex), connPool: prque.New(poolSetIndex),
disconnPool: prque.New(poolSetIndex), disconnPool: prque.New(poolSetIndex),
connectedLimit: connectedLimit, freeClientCap: freeClientCap,
totalLimit: totalLimit, totalLimit: totalLimit,
removePeer: removePeer,
} }
pool.loadFromDb() pool.loadFromDb()
return pool return pool
...@@ -85,22 +88,34 @@ func (f *freeClientPool) stop() { ...@@ -85,22 +88,34 @@ func (f *freeClientPool) stop() {
f.lock.Unlock() f.lock.Unlock()
} }
// registerPeer implements clientPool
func (f *freeClientPool) registerPeer(p *peer) {
if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
if !f.connect(addr.IP.String(), p.id) {
f.removePeer(p.id)
}
}
}
// connect should be called after a successful handshake. If the connection was // connect should be called after a successful handshake. If the connection was
// rejected, there is no need to call disconnect. // rejected, there is no need to call disconnect.
// func (f *freeClientPool) connect(address, id string) bool {
// Note: the disconnectFn callback should not block.
func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
if f.closed { if f.closed {
return false return false
} }
if f.connectedLimit == 0 {
log.Debug("Client rejected", "address", address)
return false
}
e := f.addressMap[address] e := f.addressMap[address]
now := f.clock.Now() now := f.clock.Now()
var recentUsage int64 var recentUsage int64
if e == nil { if e == nil {
e = &freeClientPoolEntry{address: address, index: -1} e = &freeClientPoolEntry{address: address, index: -1, id: id}
f.addressMap[address] = e f.addressMap[address] = e
} else { } else {
if e.connected { if e.connected {
...@@ -115,12 +130,7 @@ func (f *freeClientPool) connect(address string, disconnectFn func()) bool { ...@@ -115,12 +130,7 @@ func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
i := f.connPool.PopItem().(*freeClientPoolEntry) i := f.connPool.PopItem().(*freeClientPoolEntry)
if e.linUsage+int64(connectedBias)-i.linUsage < 0 { if e.linUsage+int64(connectedBias)-i.linUsage < 0 {
// kick it out and accept the new client // kick it out and accept the new client
f.connPool.Remove(i.index) f.dropClient(i, now)
f.calcLogUsage(i, now)
i.connected = false
f.disconnPool.Push(i, -i.logUsage)
log.Debug("Client kicked out", "address", i.address)
i.disconnectFn()
} else { } else {
// keep the old client and reject the new one // keep the old client and reject the new one
f.connPool.Push(i, i.linUsage) f.connPool.Push(i, i.linUsage)
...@@ -130,7 +140,7 @@ func (f *freeClientPool) connect(address string, disconnectFn func()) bool { ...@@ -130,7 +140,7 @@ func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
} }
f.disconnPool.Remove(e.index) f.disconnPool.Remove(e.index)
e.connected = true e.connected = true
e.disconnectFn = disconnectFn e.id = id
f.connPool.Push(e, e.linUsage) f.connPool.Push(e, e.linUsage)
if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit {
f.disconnPool.Pop() f.disconnPool.Pop()
...@@ -139,6 +149,13 @@ func (f *freeClientPool) connect(address string, disconnectFn func()) bool { ...@@ -139,6 +149,13 @@ func (f *freeClientPool) connect(address string, disconnectFn func()) bool {
return true return true
} }
// unregisterPeer implements clientPool
func (f *freeClientPool) unregisterPeer(p *peer) {
if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
f.disconnect(addr.IP.String())
}
}
// disconnect should be called when a connection is terminated. If the disconnection // disconnect should be called when a connection is terminated. If the disconnection
// was initiated by the pool itself using disconnectFn then calling disconnect is // was initiated by the pool itself using disconnectFn then calling disconnect is
// not necessary but permitted. // not necessary but permitted.
...@@ -163,6 +180,34 @@ func (f *freeClientPool) disconnect(address string) { ...@@ -163,6 +180,34 @@ func (f *freeClientPool) disconnect(address string) {
log.Debug("Client disconnected", "address", address) log.Debug("Client disconnected", "address", address)
} }
// setConnLimit sets the maximum number of free client slots and also drops
// some peers if necessary
func (f *freeClientPool) setLimits(count int, totalCap uint64) {
f.lock.Lock()
defer f.lock.Unlock()
f.connectedLimit = int(totalCap / f.freeClientCap)
if count < f.connectedLimit {
f.connectedLimit = count
}
now := mclock.Now()
for f.connPool.Size() > f.connectedLimit {
i := f.connPool.PopItem().(*freeClientPoolEntry)
f.dropClient(i, now)
}
}
// dropClient disconnects a client and also moves it from the connected to the
// disconnected pool
func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) {
f.connPool.Remove(i.index)
f.calcLogUsage(i, now)
i.connected = false
f.disconnPool.Push(i, -i.logUsage)
log.Debug("Client kicked out", "address", i.address)
f.removePeer(i.id)
}
// logOffset calculates the time-dependent offset for the logarithmic // logOffset calculates the time-dependent offset for the logarithmic
// representation of recent usage // representation of recent usage
func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 { func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 {
...@@ -245,7 +290,7 @@ func (f *freeClientPool) saveToDb() { ...@@ -245,7 +290,7 @@ func (f *freeClientPool) saveToDb() {
// even though they are close to each other at any time they may wrap around int64 // even though they are close to each other at any time they may wrap around int64
// limits over time. Comparison should be performed accordingly. // limits over time. Comparison should be performed accordingly.
type freeClientPoolEntry struct { type freeClientPoolEntry struct {
address string address, id string
connected bool connected bool
disconnectFn func() disconnectFn func()
linUsage, logUsage int64 linUsage, logUsage int64
......
...@@ -14,13 +14,12 @@ ...@@ -14,13 +14,12 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les package les
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"strconv"
"testing" "testing"
"time" "time"
...@@ -44,32 +43,38 @@ const testFreeClientPoolTicks = 500000 ...@@ -44,32 +43,38 @@ const testFreeClientPoolTicks = 500000
func testFreeClientPool(t *testing.T, connLimit, clientCount int) { func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
var ( var (
clock mclock.Simulated clock mclock.Simulated
db = ethdb.NewMemDatabase() db = ethdb.NewMemDatabase()
pool = newFreeClientPool(db, connLimit, 10000, &clock) connected = make([]bool, clientCount)
connected = make([]bool, clientCount) connTicks = make([]int, clientCount)
connTicks = make([]int, clientCount) disconnCh = make(chan int, clientCount)
disconnCh = make(chan int, clientCount) peerAddress = func(i int) string {
) return fmt.Sprintf("addr #%d", i)
peerId := func(i int) string { }
return fmt.Sprintf("test peer #%d", i) peerId = func(i int) string {
} return fmt.Sprintf("id #%d", i)
disconnFn := func(i int) func() { }
return func() { disconnFn = func(id string) {
i, err := strconv.Atoi(id[4:])
if err != nil {
panic(err)
}
disconnCh <- i disconnCh <- i
} }
} pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
)
pool.setLimits(connLimit, uint64(connLimit))
// pool should accept new peers up to its connected limit // pool should accept new peers up to its connected limit
for i := 0; i < connLimit; i++ { for i := 0; i < connLimit; i++ {
if pool.connect(peerId(i), disconnFn(i)) { if pool.connect(peerAddress(i), peerId(i)) {
connected[i] = true connected[i] = true
} else { } else {
t.Fatalf("Test peer #%d rejected", i) t.Fatalf("Test peer #%d rejected", i)
} }
} }
// since all accepted peers are new and should not be kicked out, the next one should be rejected // since all accepted peers are new and should not be kicked out, the next one should be rejected
if pool.connect(peerId(connLimit), disconnFn(connLimit)) { if pool.connect(peerAddress(connLimit), peerId(connLimit)) {
connected[connLimit] = true connected[connLimit] = true
t.Fatalf("Peer accepted over connected limit") t.Fatalf("Peer accepted over connected limit")
} }
...@@ -80,11 +85,11 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { ...@@ -80,11 +85,11 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
i := rand.Intn(clientCount) i := rand.Intn(clientCount)
if connected[i] { if connected[i] {
pool.disconnect(peerId(i)) pool.disconnect(peerAddress(i))
connected[i] = false connected[i] = false
connTicks[i] += tickCounter connTicks[i] += tickCounter
} else { } else {
if pool.connect(peerId(i), disconnFn(i)) { if pool.connect(peerAddress(i), peerId(i)) {
connected[i] = true connected[i] = true
connTicks[i] -= tickCounter connTicks[i] -= tickCounter
} }
...@@ -93,7 +98,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { ...@@ -93,7 +98,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
for { for {
select { select {
case i := <-disconnCh: case i := <-disconnCh:
pool.disconnect(peerId(i)) pool.disconnect(peerAddress(i))
if connected[i] { if connected[i] {
connTicks[i] += tickCounter connTicks[i] += tickCounter
connected[i] = false connected[i] = false
...@@ -119,20 +124,21 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { ...@@ -119,20 +124,21 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
} }
// a previously unknown peer should be accepted now // a previously unknown peer should be accepted now
if !pool.connect("newPeer", func() {}) { if !pool.connect("newAddr", "newId") {
t.Fatalf("Previously unknown peer rejected") t.Fatalf("Previously unknown peer rejected")
} }
// close and restart pool // close and restart pool
pool.stop() pool.stop()
pool = newFreeClientPool(db, connLimit, 10000, &clock) pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
pool.setLimits(connLimit, uint64(connLimit))
// try connecting all known peers (connLimit should be filled up) // try connecting all known peers (connLimit should be filled up)
for i := 0; i < clientCount; i++ { for i := 0; i < clientCount; i++ {
pool.connect(peerId(i), func() {}) pool.connect(peerAddress(i), peerId(i))
} }
// expect pool to remember known nodes and kick out one of them to accept a new one // expect pool to remember known nodes and kick out one of them to accept a new one
if !pool.connect("newPeer2", func() {}) { if !pool.connect("newAddr2", "newId2") {
t.Errorf("Previously unknown peer rejected after restarting pool") t.Errorf("Previously unknown peer rejected after restarting pool")
} }
pool.stop() pool.stop()
......
This diff is collapsed.
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -133,16 +134,6 @@ func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.Indexe ...@@ -133,16 +134,6 @@ func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.Indexe
return chtIndexer, bloomIndexer, bloomTrieIndexer return chtIndexer, bloomIndexer, bloomTrieIndexer
} }
func testRCL() RequestCostList {
cl := make(RequestCostList, len(reqList))
for i, code := range reqList {
cl[i].MsgCode = code
cl[i].BaseCost = 0
cl[i].ReqCost = 0
}
return cl
}
// newTestProtocolManager creates a new protocol manager for testing purposes, // newTestProtocolManager creates a new protocol manager for testing purposes,
// with the given number of blocks already known, potential notification // with the given number of blocks already known, potential notification
// channels for different events and relative chain indexers array. // channels for different events and relative chain indexers array.
...@@ -183,14 +174,14 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor ...@@ -183,14 +174,14 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
if !lightSync { if !lightSync {
srv := &LesServer{lesCommons: lesCommons{protocolManager: pm}} srv := &LesServer{lesCommons: lesCommons{protocolManager: pm}}
pm.server = srv pm.server = srv
pm.servingQueue.setThreads(4)
srv.defParams = &flowcontrol.ServerParams{ srv.defParams = flowcontrol.ServerParams{
BufLimit: testBufLimit, BufLimit: testBufLimit,
MinRecharge: 1, MinRecharge: 1,
} }
srv.fcManager = flowcontrol.NewClientManager(50, 10, 1000000000) srv.fcManager = flowcontrol.NewClientManager(nil, &mclock.System{})
srv.fcCostStats = newCostStats(nil)
} }
pm.Start(1000) pm.Start(1000)
return pm, nil return pm, nil
...@@ -304,7 +295,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu ...@@ -304,7 +295,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
expList = expList.add("txRelay", nil) expList = expList.add("txRelay", nil)
expList = expList.add("flowControl/BL", testBufLimit) expList = expList.add("flowControl/BL", testBufLimit)
expList = expList.add("flowControl/MRR", uint64(1)) expList = expList.add("flowControl/MRR", uint64(1))
expList = expList.add("flowControl/MRC", testRCL()) expList = expList.add("flowControl/MRC", testCostList())
if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil { if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil {
t.Fatalf("status recv: %v", err) t.Fatalf("status recv: %v", err)
...@@ -313,7 +304,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu ...@@ -313,7 +304,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
t.Fatalf("status send: %v", err) t.Fatalf("status send: %v", err)
} }
p.fcServerParams = &flowcontrol.ServerParams{ p.fcParams = flowcontrol.ServerParams{
BufLimit: testBufLimit, BufLimit: testBufLimit,
MinRecharge: 1, MinRecharge: 1,
} }
...@@ -375,7 +366,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers fun ...@@ -375,7 +366,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers fun
db, ldb := ethdb.NewMemDatabase(), ethdb.NewMemDatabase() db, ldb := ethdb.NewMemDatabase(), ethdb.NewMemDatabase()
peers, lPeers := newPeerSet(), newPeerSet() peers, lPeers := newPeerSet(), newPeerSet()
dist := newRequestDistributor(lPeers, make(chan struct{})) dist := newRequestDistributor(lPeers, make(chan struct{}), &mclock.System{})
rm := newRetrieveManager(lPeers, dist, nil) rm := newRetrieveManager(lPeers, dist, nil)
odr := NewLesOdr(ldb, light.TestClientIndexerConfig, rm) odr := NewLesOdr(ldb, light.TestClientIndexerConfig, rm)
......
...@@ -117,7 +117,7 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro ...@@ -117,7 +117,7 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro
request: func(dp distPeer) func() { request: func(dp distPeer) func() {
p := dp.(*peer) p := dp.(*peer)
cost := lreq.GetCost(p) cost := lreq.GetCost(p)
p.fcServer.QueueRequest(reqID, cost) p.fcServer.QueuedRequest(reqID, cost)
return func() { lreq.Request(reqID, p) } return func() { lreq.Request(reqID, p) }
}, },
} }
......
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les package les
import ( import (
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package les implements the Light Ethereum Subprotocol.
package les package les
import ( import (
......
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les package les
import ( import (
......
This diff is collapsed.
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package les implements the Light Ethereum Subprotocol.
package les package les
import ( import (
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment