Unverified Commit 4996fce2 authored by Felföldi Zsolt's avatar Felföldi Zsolt Committed by GitHub

les, les/lespay/server: refactor client pool (#21236)

* les, les/lespay/server: refactor client pool

* les: use ns.Operation and sub calls where needed

* les: fixed tests

* les: removed active/inactive logic from peerSet

* les: removed active/inactive peer logic

* les: fixed linter warnings

* les: fixed more linter errors and added missing metrics

* les: addressed comments

* cmd/geth: fixed TestPriorityClient

* les: simplified clientPool state machine

* les/lespay/server: do not use goroutine for balance callbacks

* internal/web3ext: fix addBalance required parameters

* les: removed freeCapacity, always connect at minCapacity initially

* les: only allow capacity change with priority status
Co-authored-by: 's avatarrjl493456442 <garyrong0905@gmail.com>
parent f7112cc1
...@@ -152,7 +152,7 @@ func TestPriorityClient(t *testing.T) { ...@@ -152,7 +152,7 @@ func TestPriorityClient(t *testing.T) {
defer prioCli.killAndWait() defer prioCli.killAndWait()
// 3_000_000_000 once we move to Go 1.13 // 3_000_000_000 once we move to Go 1.13
tokens := 3000000000 tokens := 3000000000
lightServer.callRPC(nil, "les_addBalance", prioCli.getNodeInfo().ID, tokens, "foobar") lightServer.callRPC(nil, "les_addBalance", prioCli.getNodeInfo().ID, tokens)
prioCli.addPeer(lightServer) prioCli.addPeer(lightServer)
// Check if priority client is actually syncing and the regular client got kicked out // Check if priority client is actually syncing and the regular client got kicked out
......
...@@ -36,14 +36,15 @@ type LazyQueue struct { ...@@ -36,14 +36,15 @@ type LazyQueue struct {
// Items are stored in one of two internal queues ordered by estimated max // Items are stored in one of two internal queues ordered by estimated max
// priority until the next and the next-after-next refresh. Update and Refresh // priority until the next and the next-after-next refresh. Update and Refresh
// always places items in queue[1]. // always places items in queue[1].
queue [2]*sstack queue [2]*sstack
popQueue *sstack popQueue *sstack
period time.Duration period time.Duration
maxUntil mclock.AbsTime maxUntil mclock.AbsTime
indexOffset int indexOffset int
setIndex SetIndexCallback setIndex SetIndexCallback
priority PriorityCallback priority PriorityCallback
maxPriority MaxPriorityCallback maxPriority MaxPriorityCallback
lastRefresh1, lastRefresh2 mclock.AbsTime
} }
type ( type (
...@@ -54,14 +55,17 @@ type ( ...@@ -54,14 +55,17 @@ type (
// NewLazyQueue creates a new lazy queue // NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{ q := &LazyQueue{
popQueue: newSstack(nil), popQueue: newSstack(nil),
setIndex: setIndex, setIndex: setIndex,
priority: priority, priority: priority,
maxPriority: maxPriority, maxPriority: maxPriority,
clock: clock, clock: clock,
period: refreshPeriod} period: refreshPeriod,
lastRefresh1: clock.Now(),
lastRefresh2: clock.Now(),
}
q.Reset() q.Reset()
q.Refresh() q.refresh(clock.Now())
return q return q
} }
...@@ -71,9 +75,19 @@ func (q *LazyQueue) Reset() { ...@@ -71,9 +75,19 @@ func (q *LazyQueue) Reset() {
q.queue[1] = newSstack(q.setIndex1) q.queue[1] = newSstack(q.setIndex1)
} }
// Refresh should be called at least with the frequency specified by the refreshPeriod parameter // Refresh performs queue re-evaluation if necessary
func (q *LazyQueue) Refresh() { func (q *LazyQueue) Refresh() {
q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period) now := q.clock.Now()
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
q.refresh(now)
q.lastRefresh2 = q.lastRefresh1
q.lastRefresh1 = now
}
}
// refresh re-evaluates items in the older queue and swaps the two queues
func (q *LazyQueue) refresh(now mclock.AbsTime) {
q.maxUntil = now + mclock.AbsTime(q.period)
for q.queue[0].Len() != 0 { for q.queue[0].Len() != 0 {
q.Push(heap.Pop(q.queue[0]).(*item).value) q.Push(heap.Pop(q.queue[0]).(*item).value)
} }
...@@ -139,6 +153,7 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo ...@@ -139,6 +153,7 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
} }
return return
} }
nextIndex = q.peekIndex() // re-check because callback is allowed to push items back
} }
} }
} }
......
...@@ -844,7 +844,7 @@ web3._extend({ ...@@ -844,7 +844,7 @@ web3._extend({
new web3._extend.Method({ new web3._extend.Method({
name: 'addBalance', name: 'addBalance',
call: 'les_addBalance', call: 'les_addBalance',
params: 3 params: 2
}), }),
], ],
properties: properties:
......
...@@ -19,11 +19,11 @@ package les ...@@ -19,11 +19,11 @@ package les
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
lps "github.com/ethereum/go-ethereum/les/lespay/server"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
) )
...@@ -31,16 +31,13 @@ var ( ...@@ -31,16 +31,13 @@ var (
errNoCheckpoint = errors.New("no local checkpoint provided") errNoCheckpoint = errors.New("no local checkpoint provided")
errNotActivated = errors.New("checkpoint registrar is not activated") errNotActivated = errors.New("checkpoint registrar is not activated")
errUnknownBenchmarkType = errors.New("unknown benchmark type") errUnknownBenchmarkType = errors.New("unknown benchmark type")
errBalanceOverflow = errors.New("balance overflow")
errNoPriority = errors.New("priority too low to raise capacity") errNoPriority = errors.New("priority too low to raise capacity")
) )
const maxBalance = math.MaxInt64
// PrivateLightServerAPI provides an API to access the LES light server. // PrivateLightServerAPI provides an API to access the LES light server.
type PrivateLightServerAPI struct { type PrivateLightServerAPI struct {
server *LesServer server *LesServer
defaultPosFactors, defaultNegFactors priceFactors defaultPosFactors, defaultNegFactors lps.PriceFactors
} }
// NewPrivateLightServerAPI creates a new LES light server API. // NewPrivateLightServerAPI creates a new LES light server API.
...@@ -57,7 +54,6 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} { ...@@ -57,7 +54,6 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} {
res := make(map[string]interface{}) res := make(map[string]interface{})
res["minimumCapacity"] = api.server.minCapacity res["minimumCapacity"] = api.server.minCapacity
res["maximumCapacity"] = api.server.maxCapacity res["maximumCapacity"] = api.server.maxCapacity
res["freeClientCapacity"] = api.server.freeCapacity
res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo() res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo()
return res return res
} }
...@@ -65,9 +61,8 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} { ...@@ -65,9 +61,8 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} {
// ClientInfo returns information about clients listed in the ids list or matching the given tags // ClientInfo returns information about clients listed in the ids list or matching the given tags
func (api *PrivateLightServerAPI) ClientInfo(ids []enode.ID) map[enode.ID]map[string]interface{} { func (api *PrivateLightServerAPI) ClientInfo(ids []enode.ID) map[enode.ID]map[string]interface{} {
res := make(map[enode.ID]map[string]interface{}) res := make(map[enode.ID]map[string]interface{})
api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error { api.server.clientPool.forClients(ids, func(client *clientInfo) {
res[id] = api.clientInfo(client, id) res[client.node.ID()] = api.clientInfo(client)
return nil
}) })
return res return res
} }
...@@ -80,48 +75,40 @@ func (api *PrivateLightServerAPI) ClientInfo(ids []enode.ID) map[enode.ID]map[st ...@@ -80,48 +75,40 @@ func (api *PrivateLightServerAPI) ClientInfo(ids []enode.ID) map[enode.ID]map[st
// assigned to it. // assigned to it.
func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} { func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} {
res := make(map[enode.ID]map[string]interface{}) res := make(map[enode.ID]map[string]interface{})
ids := api.server.clientPool.ndb.getPosBalanceIDs(start, stop, maxCount+1) ids := api.server.clientPool.bt.GetPosBalanceIDs(start, stop, maxCount+1)
if len(ids) > maxCount { if len(ids) > maxCount {
res[ids[maxCount]] = make(map[string]interface{}) res[ids[maxCount]] = make(map[string]interface{})
ids = ids[:maxCount] ids = ids[:maxCount]
} }
if len(ids) != 0 { if len(ids) != 0 {
api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error { api.server.clientPool.forClients(ids, func(client *clientInfo) {
res[id] = api.clientInfo(client, id) res[client.node.ID()] = api.clientInfo(client)
return nil
}) })
} }
return res return res
} }
// clientInfo creates a client info data structure // clientInfo creates a client info data structure
func (api *PrivateLightServerAPI) clientInfo(c *clientInfo, id enode.ID) map[string]interface{} { func (api *PrivateLightServerAPI) clientInfo(c *clientInfo) map[string]interface{} {
info := make(map[string]interface{}) info := make(map[string]interface{})
if c != nil { pb, nb := c.balance.GetBalance()
now := mclock.Now() info["isConnected"] = c.connected
info["isConnected"] = true info["pricing/balance"] = pb
info["connectionTime"] = float64(now-c.connectedAt) / float64(time.Second) info["priority"] = pb != 0
info["capacity"] = c.capacity // cb := api.server.clientPool.ndb.getCurrencyBalance(id)
pb, nb := c.balanceTracker.getBalance(now) // info["pricing/currency"] = cb.amount
info["pricing/balance"], info["pricing/negBalance"] = pb, nb if c.connected {
info["pricing/balanceMeta"] = c.balanceMetaInfo info["connectionTime"] = float64(mclock.Now()-c.connectedAt) / float64(time.Second)
info["priority"] = pb != 0 info["capacity"], _ = api.server.clientPool.ns.GetField(c.node, priorityPoolSetup.CapacityField).(uint64)
} else { info["pricing/negBalance"] = nb
info["isConnected"] = false
pb := api.server.clientPool.ndb.getOrNewPB(id)
info["pricing/balance"], info["pricing/balanceMeta"] = pb.value, pb.meta
info["priority"] = pb.value != 0
} }
return info return info
} }
// setParams either sets the given parameters for a single connected client (if specified) // setParams either sets the given parameters for a single connected client (if specified)
// or the default parameters applicable to clients connected in the future // or the default parameters applicable to clients connected in the future
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *priceFactors) (updateFactors bool, err error) { func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *lps.PriceFactors) (updateFactors bool, err error) {
defParams := client == nil defParams := client == nil
if !defParams {
posFactors, negFactors = &client.posFactors, &client.negFactors
}
for name, value := range params { for name, value := range params {
errValue := func() error { errValue := func() error {
return fmt.Errorf("invalid value for parameter '%s'", name) return fmt.Errorf("invalid value for parameter '%s'", name)
...@@ -137,20 +124,20 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien ...@@ -137,20 +124,20 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
switch { switch {
case name == "pricing/timeFactor": case name == "pricing/timeFactor":
setFactor(&posFactors.timeFactor) setFactor(&posFactors.TimeFactor)
case name == "pricing/capacityFactor": case name == "pricing/capacityFactor":
setFactor(&posFactors.capacityFactor) setFactor(&posFactors.CapacityFactor)
case name == "pricing/requestCostFactor": case name == "pricing/requestCostFactor":
setFactor(&posFactors.requestFactor) setFactor(&posFactors.RequestFactor)
case name == "pricing/negative/timeFactor": case name == "pricing/negative/timeFactor":
setFactor(&negFactors.timeFactor) setFactor(&negFactors.TimeFactor)
case name == "pricing/negative/capacityFactor": case name == "pricing/negative/capacityFactor":
setFactor(&negFactors.capacityFactor) setFactor(&negFactors.CapacityFactor)
case name == "pricing/negative/requestCostFactor": case name == "pricing/negative/requestCostFactor":
setFactor(&negFactors.requestFactor) setFactor(&negFactors.RequestFactor)
case !defParams && name == "capacity": case !defParams && name == "capacity":
if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity { if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity {
err = api.server.clientPool.setCapacity(client, uint64(capacity)) _, err = api.server.clientPool.setCapacity(client.node, client.address, uint64(capacity), 0, true)
// Don't have to call factor update explicitly. It's already done // Don't have to call factor update explicitly. It's already done
// in setCapacity function. // in setCapacity function.
} else { } else {
...@@ -170,27 +157,25 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien ...@@ -170,27 +157,25 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
return return
} }
// AddBalance updates the balance of a client (either overwrites it or adds to it).
// It also updates the balance meta info string.
func (api *PrivateLightServerAPI) AddBalance(id enode.ID, value int64, meta string) ([2]uint64, error) {
oldBalance, newBalance, err := api.server.clientPool.addBalance(id, value, meta)
return [2]uint64{oldBalance, newBalance}, err
}
// SetClientParams sets client parameters for all clients listed in the ids list // SetClientParams sets client parameters for all clients listed in the ids list
// or all connected clients if the list is empty // or all connected clients if the list is empty
func (api *PrivateLightServerAPI) SetClientParams(ids []enode.ID, params map[string]interface{}) error { func (api *PrivateLightServerAPI) SetClientParams(ids []enode.ID, params map[string]interface{}) error {
return api.server.clientPool.forClients(ids, func(client *clientInfo, id enode.ID) error { var err error
if client != nil { api.server.clientPool.forClients(ids, func(client *clientInfo) {
update, err := api.setParams(params, client, nil, nil) if client.connected {
posFactors, negFactors := client.balance.GetPriceFactors()
update, e := api.setParams(params, client, &posFactors, &negFactors)
if update { if update {
client.updatePriceFactors() client.balance.SetPriceFactors(posFactors, negFactors)
}
if e != nil {
err = e
} }
return err
} else { } else {
return fmt.Errorf("client %064x is not connected", id[:]) err = fmt.Errorf("client %064x is not connected", client.node.ID())
} }
}) })
return err
} }
// SetDefaultParams sets the default parameters applicable to clients connected in the future // SetDefaultParams sets the default parameters applicable to clients connected in the future
...@@ -214,6 +199,15 @@ func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error { ...@@ -214,6 +199,15 @@ func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error {
return nil return nil
} }
// AddBalance adds the given amount to the balance of a client if possible and returns
// the balance before and after the operation
func (api *PrivateLightServerAPI) AddBalance(id enode.ID, amount int64) (balance [2]uint64, err error) {
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
balance[0], balance[1], err = c.balance.AddBalance(amount)
})
return
}
// Benchmark runs a request performance benchmark with a given set of measurement setups // Benchmark runs a request performance benchmark with a given set of measurement setups
// in multiple passes specified by passCount. The measurement time for each setup in each // in multiple passes specified by passCount. The measurement time for each setup in each
// pass is specified in milliseconds by length. // pass is specified in milliseconds by length.
...@@ -304,13 +298,15 @@ func NewPrivateDebugAPI(server *LesServer) *PrivateDebugAPI { ...@@ -304,13 +298,15 @@ func NewPrivateDebugAPI(server *LesServer) *PrivateDebugAPI {
// FreezeClient forces a temporary client freeze which normally happens when the server is overloaded // FreezeClient forces a temporary client freeze which normally happens when the server is overloaded
func (api *PrivateDebugAPI) FreezeClient(id enode.ID) error { func (api *PrivateDebugAPI) FreezeClient(id enode.ID) error {
return api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo, id enode.ID) error { var err error
if c == nil { api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
return fmt.Errorf("client %064x is not connected", id[:]) if c.connected {
c.peer.freeze()
} else {
err = fmt.Errorf("client %064x is not connected", id[:])
} }
c.peer.freezeClient()
return nil
}) })
return err
} }
// PrivateLightAPI provides an API to access the LES light server or light client. // PrivateLightAPI provides an API to access the LES light server or light client.
......
...@@ -107,7 +107,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { ...@@ -107,7 +107,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
t.Fatalf("Failed to obtain rpc client: %v", err) t.Fatalf("Failed to obtain rpc client: %v", err)
} }
headNum, headHash := getHead(ctx, t, serverRpcClient) headNum, headHash := getHead(ctx, t, serverRpcClient)
minCap, freeCap, totalCap := getCapacityInfo(ctx, t, serverRpcClient) minCap, totalCap := getCapacityInfo(ctx, t, serverRpcClient)
testCap := totalCap * 3 / 4 testCap := totalCap * 3 / 4
t.Logf("Server testCap: %d minCap: %d head number: %d head hash: %064x\n", testCap, minCap, headNum, headHash) t.Logf("Server testCap: %d minCap: %d head number: %d head hash: %064x\n", testCap, minCap, headNum, headHash)
reqMinCap := uint64(float64(testCap) * minRelCap / (minRelCap + float64(len(clients)-1))) reqMinCap := uint64(float64(testCap) * minRelCap / (minRelCap + float64(len(clients)-1)))
...@@ -202,7 +202,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { ...@@ -202,7 +202,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
weights := make([]float64, len(clients)) weights := make([]float64, len(clients))
for c := 0; c < 5; c++ { for c := 0; c < 5; c++ {
setCapacity(ctx, t, serverRpcClient, clients[freeIdx].ID(), freeCap) setCapacity(ctx, t, serverRpcClient, clients[freeIdx].ID(), minCap)
freeIdx = rand.Intn(len(clients)) freeIdx = rand.Intn(len(clients))
var sum float64 var sum float64
for i := range clients { for i := range clients {
...@@ -214,7 +214,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { ...@@ -214,7 +214,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
sum += weights[i] sum += weights[i]
} }
for i, client := range clients { for i, client := range clients {
weights[i] *= float64(testCap-freeCap-100) / sum weights[i] *= float64(testCap-minCap-100) / sum
capacity := uint64(weights[i]) capacity := uint64(weights[i])
if i != freeIdx && capacity < getCapacity(ctx, t, serverRpcClient, client.ID()) { if i != freeIdx && capacity < getCapacity(ctx, t, serverRpcClient, client.ID()) {
setCapacity(ctx, t, serverRpcClient, client.ID(), capacity) setCapacity(ctx, t, serverRpcClient, client.ID(), capacity)
...@@ -227,7 +227,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { ...@@ -227,7 +227,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
setCapacity(ctx, t, serverRpcClient, client.ID(), capacity) setCapacity(ctx, t, serverRpcClient, client.ID(), capacity)
} }
} }
weights[freeIdx] = float64(freeCap) weights[freeIdx] = float64(minCap)
for i := range clients { for i := range clients {
weights[i] /= float64(testCap) weights[i] /= float64(testCap)
} }
...@@ -247,7 +247,7 @@ func testCapacityAPI(t *testing.T, clientCount int) { ...@@ -247,7 +247,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
default: default:
} }
_, _, totalCap = getCapacityInfo(ctx, t, serverRpcClient) _, totalCap = getCapacityInfo(ctx, t, serverRpcClient)
if totalCap < testCap { if totalCap < testCap {
t.Log("Total capacity underrun") t.Log("Total capacity underrun")
close(stop) close(stop)
...@@ -370,7 +370,7 @@ func getCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID ...@@ -370,7 +370,7 @@ func getCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID
return uint64(vv) return uint64(vv)
} }
func getCapacityInfo(ctx context.Context, t *testing.T, server *rpc.Client) (minCap, freeCap, totalCap uint64) { func getCapacityInfo(ctx context.Context, t *testing.T, server *rpc.Client) (minCap, totalCap uint64) {
var res map[string]interface{} var res map[string]interface{}
if err := server.CallContext(ctx, &res, "les_serverInfo"); err != nil { if err := server.CallContext(ctx, &res, "les_serverInfo"); err != nil {
t.Fatalf("Failed to query server info: %v", err) t.Fatalf("Failed to query server info: %v", err)
...@@ -387,7 +387,6 @@ func getCapacityInfo(ctx context.Context, t *testing.T, server *rpc.Client) (min ...@@ -387,7 +387,6 @@ func getCapacityInfo(ctx context.Context, t *testing.T, server *rpc.Client) (min
return uint64(vv) return uint64(vv)
} }
minCap = decode("minimumCapacity") minCap = decode("minimumCapacity")
freeCap = decode("freeClientCapacity")
totalCap = decode("totalCapacity") totalCap = decode("totalCapacity")
return return
} }
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"bytes"
"encoding/binary"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
)
const (
balanceCacheLimit = 8192 // the maximum number of cached items in service token balance queue
// nodeDBVersion is the version identifier of the node data in db
//
// Changelog:
// * Replace `lastTotal` with `meta` in positive balance: version 0=>1
nodeDBVersion = 1
// dbCleanupCycle is the cycle of db for useless data cleanup
dbCleanupCycle = time.Hour
)
var (
positiveBalancePrefix = []byte("pb:") // dbVersion(uint16 big endian) + positiveBalancePrefix + id -> balance
negativeBalancePrefix = []byte("nb:") // dbVersion(uint16 big endian) + negativeBalancePrefix + ip -> balance
expirationKey = []byte("expiration:") // dbVersion(uint16 big endian) + expirationKey -> posExp, negExp
)
type nodeDB struct {
db ethdb.KeyValueStore
cache *lru.Cache
auxbuf []byte // 37-byte auxiliary buffer for key encoding
verbuf [2]byte // 2-byte auxiliary buffer for db version
evictCallBack func(mclock.AbsTime, bool, utils.ExpiredValue) bool // Callback to determine whether the balance can be evicted.
clock mclock.Clock
closeCh chan struct{}
cleanupHook func() // Test hook used for testing
}
func newNodeDB(db ethdb.KeyValueStore, clock mclock.Clock) *nodeDB {
cache, _ := lru.New(balanceCacheLimit)
ndb := &nodeDB{
db: db,
cache: cache,
auxbuf: make([]byte, 37),
clock: clock,
closeCh: make(chan struct{}),
}
binary.BigEndian.PutUint16(ndb.verbuf[:], uint16(nodeDBVersion))
go ndb.expirer()
return ndb
}
func (db *nodeDB) close() {
close(db.closeCh)
}
func (db *nodeDB) getPrefix(neg bool) []byte {
prefix := positiveBalancePrefix
if neg {
prefix = negativeBalancePrefix
}
return append(db.verbuf[:], prefix...)
}
func (db *nodeDB) key(id []byte, neg bool) []byte {
prefix := positiveBalancePrefix
if neg {
prefix = negativeBalancePrefix
}
if len(prefix)+len(db.verbuf)+len(id) > len(db.auxbuf) {
db.auxbuf = append(db.auxbuf, make([]byte, len(prefix)+len(db.verbuf)+len(id)-len(db.auxbuf))...)
}
copy(db.auxbuf[:len(db.verbuf)], db.verbuf[:])
copy(db.auxbuf[len(db.verbuf):len(db.verbuf)+len(prefix)], prefix)
copy(db.auxbuf[len(prefix)+len(db.verbuf):len(prefix)+len(db.verbuf)+len(id)], id)
return db.auxbuf[:len(prefix)+len(db.verbuf)+len(id)]
}
func (db *nodeDB) getExpiration() (utils.Fixed64, utils.Fixed64) {
blob, err := db.db.Get(append(db.verbuf[:], expirationKey...))
if err != nil || len(blob) != 16 {
return 0, 0
}
return utils.Fixed64(binary.BigEndian.Uint64(blob[:8])), utils.Fixed64(binary.BigEndian.Uint64(blob[8:16]))
}
func (db *nodeDB) setExpiration(pos, neg utils.Fixed64) {
var buff [16]byte
binary.BigEndian.PutUint64(buff[:8], uint64(pos))
binary.BigEndian.PutUint64(buff[8:16], uint64(neg))
db.db.Put(append(db.verbuf[:], expirationKey...), buff[:16])
}
func (db *nodeDB) getOrNewBalance(id []byte, neg bool) utils.ExpiredValue {
key := db.key(id, neg)
item, exist := db.cache.Get(string(key))
if exist {
return item.(utils.ExpiredValue)
}
var b utils.ExpiredValue
enc, err := db.db.Get(key)
if err != nil || len(enc) == 0 {
return b
}
if err := rlp.DecodeBytes(enc, &b); err != nil {
log.Crit("Failed to decode positive balance", "err", err)
}
db.cache.Add(string(key), b)
return b
}
func (db *nodeDB) setBalance(id []byte, neg bool, b utils.ExpiredValue) {
key := db.key(id, neg)
enc, err := rlp.EncodeToBytes(&(b))
if err != nil {
log.Crit("Failed to encode positive balance", "err", err)
}
db.db.Put(key, enc)
db.cache.Add(string(key), b)
}
func (db *nodeDB) delBalance(id []byte, neg bool) {
key := db.key(id, neg)
db.db.Delete(key)
db.cache.Remove(string(key))
}
// getPosBalanceIDs returns a lexicographically ordered list of IDs of accounts
// with a positive balance
func (db *nodeDB) getPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) {
if maxCount <= 0 {
return
}
prefix := db.getPrefix(false)
keylen := len(prefix) + len(enode.ID{})
it := db.db.NewIterator(prefix, start.Bytes())
defer it.Release()
for it.Next() {
var id enode.ID
if len(it.Key()) != keylen {
return
}
copy(id[:], it.Key()[keylen-len(id):])
if bytes.Compare(id.Bytes(), stop.Bytes()) >= 0 {
return
}
result = append(result, id)
if len(result) == maxCount {
return
}
}
return
}
// forEachBalance iterates all balances and passes values to callback.
func (db *nodeDB) forEachBalance(neg bool, callback func(id enode.ID, balance utils.ExpiredValue) bool) {
prefix := db.getPrefix(neg)
keylen := len(prefix) + len(enode.ID{})
it := db.db.NewIterator(prefix, nil)
defer it.Release()
for it.Next() {
var id enode.ID
if len(it.Key()) != keylen {
return
}
copy(id[:], it.Key()[keylen-len(id):])
var b utils.ExpiredValue
if err := rlp.DecodeBytes(it.Value(), &b); err != nil {
continue
}
if !callback(id, b) {
return
}
}
}
func (db *nodeDB) expirer() {
for {
select {
case <-db.clock.After(dbCleanupCycle):
db.expireNodes()
case <-db.closeCh:
return
}
}
}
// expireNodes iterates the whole node db and checks whether the
// token balances can be deleted.
func (db *nodeDB) expireNodes() {
var (
visited int
deleted int
start = time.Now()
)
for _, neg := range []bool{false, true} {
iter := db.db.NewIterator(db.getPrefix(neg), nil)
for iter.Next() {
visited++
var balance utils.ExpiredValue
if err := rlp.DecodeBytes(iter.Value(), &balance); err != nil {
log.Crit("Failed to decode negative balance", "err", err)
}
if db.evictCallBack != nil && db.evictCallBack(db.clock.Now(), neg, balance) {
deleted++
db.db.Delete(iter.Key())
}
}
}
// Invoke testing hook if it's not nil.
if db.cleanupHook != nil {
db.cleanupHook()
}
log.Debug("Expire nodes", "visited", visited, "deleted", deleted, "elapsed", common.PrettyDuration(time.Since(start)))
}
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/p2p/enode"
)
func expval(v uint64) utils.ExpiredValue {
return utils.ExpiredValue{Base: v}
}
func TestNodeDB(t *testing.T) {
ndb := newNodeDB(rawdb.NewMemoryDatabase(), mclock.System{})
defer ndb.close()
var cases = []struct {
id enode.ID
ip string
balance utils.ExpiredValue
positive bool
}{
{enode.ID{0x00, 0x01, 0x02}, "", expval(100), true},
{enode.ID{0x00, 0x01, 0x02}, "", expval(200), true},
{enode.ID{}, "127.0.0.1", expval(100), false},
{enode.ID{}, "127.0.0.1", expval(200), false},
}
for _, c := range cases {
if c.positive {
ndb.setBalance(c.id.Bytes(), false, c.balance)
if pb := ndb.getOrNewBalance(c.id.Bytes(), false); !reflect.DeepEqual(pb, c.balance) {
t.Fatalf("Positive balance mismatch, want %v, got %v", c.balance, pb)
}
} else {
ndb.setBalance([]byte(c.ip), true, c.balance)
if nb := ndb.getOrNewBalance([]byte(c.ip), true); !reflect.DeepEqual(nb, c.balance) {
t.Fatalf("Negative balance mismatch, want %v, got %v", c.balance, nb)
}
}
}
for _, c := range cases {
if c.positive {
ndb.delBalance(c.id.Bytes(), false)
if pb := ndb.getOrNewBalance(c.id.Bytes(), false); !reflect.DeepEqual(pb, utils.ExpiredValue{}) {
t.Fatalf("Positive balance mismatch, want %v, got %v", utils.ExpiredValue{}, pb)
}
} else {
ndb.delBalance([]byte(c.ip), true)
if nb := ndb.getOrNewBalance([]byte(c.ip), true); !reflect.DeepEqual(nb, utils.ExpiredValue{}) {
t.Fatalf("Negative balance mismatch, want %v, got %v", utils.ExpiredValue{}, nb)
}
}
}
posExp, negExp := utils.Fixed64(1000), utils.Fixed64(2000)
ndb.setExpiration(posExp, negExp)
if pos, neg := ndb.getExpiration(); pos != posExp || neg != negExp {
t.Fatalf("Expiration mismatch, want %v / %v, got %v / %v", posExp, negExp, pos, neg)
}
/* curBalance := currencyBalance{typ: "ETH", amount: 10000}
ndb.setCurrencyBalance(enode.ID{0x01, 0x02}, curBalance)
if got := ndb.getCurrencyBalance(enode.ID{0x01, 0x02}); !reflect.DeepEqual(got, curBalance) {
t.Fatalf("Currency balance mismatch, want %v, got %v", curBalance, got)
}*/
}
func TestNodeDBExpiration(t *testing.T) {
var (
iterated int
done = make(chan struct{}, 1)
)
callback := func(now mclock.AbsTime, neg bool, b utils.ExpiredValue) bool {
iterated += 1
return true
}
clock := &mclock.Simulated{}
ndb := newNodeDB(rawdb.NewMemoryDatabase(), clock)
defer ndb.close()
ndb.evictCallBack = callback
ndb.cleanupHook = func() { done <- struct{}{} }
var cases = []struct {
id []byte
neg bool
balance utils.ExpiredValue
}{
{[]byte{0x01, 0x02}, false, expval(1)},
{[]byte{0x03, 0x04}, false, expval(1)},
{[]byte{0x05, 0x06}, false, expval(1)},
{[]byte{0x07, 0x08}, false, expval(1)},
{[]byte("127.0.0.1"), true, expval(1)},
{[]byte("127.0.0.2"), true, expval(1)},
{[]byte("127.0.0.3"), true, expval(1)},
{[]byte("127.0.0.4"), true, expval(1)},
}
for _, c := range cases {
ndb.setBalance(c.id, c.neg, c.balance)
}
clock.WaitForTimers(1)
clock.Run(time.Hour + time.Minute)
select {
case <-done:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
if iterated != 8 {
t.Fatalf("Failed to evict useless balances, want %v, got %d", 8, iterated)
}
for _, c := range cases {
ndb.setBalance(c.id, c.neg, c.balance)
}
clock.WaitForTimers(1)
clock.Run(time.Hour + time.Minute)
select {
case <-done:
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
if iterated != 16 {
t.Fatalf("Failed to evict useless balances, want %v, got %d", 16, iterated)
}
}
This diff is collapsed.
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"math/rand"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
)
var (
testSetup = &nodestate.Setup{}
ppTestClientFlag = testSetup.NewFlag("ppTestClientFlag")
ppTestClientField = testSetup.NewField("ppTestClient", reflect.TypeOf(&ppTestClient{}))
ppUpdateFlag = testSetup.NewFlag("ppUpdateFlag")
ppTestSetup = NewPriorityPoolSetup(testSetup)
)
func init() {
ppTestSetup.Connect(ppTestClientField, ppUpdateFlag)
}
const (
testCapacityStepDiv = 100
testCapacityToleranceDiv = 10
)
type ppTestClient struct {
node *enode.Node
balance, cap uint64
}
func (c *ppTestClient) Priority(now mclock.AbsTime, cap uint64) int64 {
return int64(c.balance / cap)
}
func (c *ppTestClient) EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64 {
return int64(c.balance / cap)
}
func TestPriorityPool(t *testing.T) {
clock := &mclock.Simulated{}
ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
ns.SubscribeField(ppTestSetup.CapacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
if n := ns.GetField(node, ppTestSetup.priorityField); n != nil {
c := n.(*ppTestClient)
c.cap = newValue.(uint64)
}
})
pp := NewPriorityPool(ns, ppTestSetup, clock, 100, 0, testCapacityStepDiv)
ns.Start()
pp.SetLimits(100, 1000000)
clients := make([]*ppTestClient, 100)
raise := func(c *ppTestClient) {
for {
var ok bool
ns.Operation(func() {
_, ok = pp.RequestCapacity(c.node, c.cap+c.cap/testCapacityStepDiv, 0, true)
})
if !ok {
return
}
}
}
var sumBalance uint64
check := func(c *ppTestClient) {
expCap := 1000000 * c.balance / sumBalance
capTol := expCap / testCapacityToleranceDiv
if c.cap < expCap-capTol || c.cap > expCap+capTol {
t.Errorf("Wrong node capacity (expected %d, got %d)", expCap, c.cap)
}
}
for i := range clients {
c := &ppTestClient{
node: enode.SignNull(&enr.Record{}, enode.ID{byte(i)}),
balance: 1000000000,
cap: 1000,
}
sumBalance += c.balance
clients[i] = c
ns.SetState(c.node, ppTestClientFlag, nodestate.Flags{}, 0)
ns.SetField(c.node, ppTestSetup.priorityField, c)
ns.SetState(c.node, ppTestSetup.InactiveFlag, nodestate.Flags{}, 0)
raise(c)
check(c)
}
for count := 0; count < 100; count++ {
c := clients[rand.Intn(len(clients))]
oldBalance := c.balance
c.balance = uint64(rand.Int63n(1000000000) + 1000000000)
sumBalance += c.balance - oldBalance
pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
if c.balance > oldBalance {
raise(c)
} else {
for _, c := range clients {
raise(c)
}
}
for _, c := range clients {
check(c)
}
}
ns.Stop()
}
...@@ -99,8 +99,8 @@ var ( ...@@ -99,8 +99,8 @@ var (
sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil) sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil)
clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil) clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil)
clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil) clientActivatedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/activated", nil)
clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil) clientDeactivatedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/deactivated", nil)
clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil) clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil)
clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil) clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil) clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/les/flowcontrol"
lpc "github.com/ethereum/go-ethereum/les/lespay/client" lpc "github.com/ethereum/go-ethereum/les/lespay/client"
lps "github.com/ethereum/go-ethereum/les/lespay/server"
"github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
...@@ -463,7 +464,7 @@ func (p *serverPeer) requestTxStatus(reqID uint64, txHashes []common.Hash) error ...@@ -463,7 +464,7 @@ func (p *serverPeer) requestTxStatus(reqID uint64, txHashes []common.Hash) error
return p.sendRequest(GetTxStatusMsg, reqID, txHashes, len(txHashes)) return p.sendRequest(GetTxStatusMsg, reqID, txHashes, len(txHashes))
} }
// SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool. // sendTxs creates a reply with a batch of transactions to be added to the remote transaction pool.
func (p *serverPeer) sendTxs(reqID uint64, amount int, txs rlp.RawValue) error { func (p *serverPeer) sendTxs(reqID uint64, amount int, txs rlp.RawValue) error {
p.Log().Debug("Sending batch of transactions", "amount", amount, "size", len(txs)) p.Log().Debug("Sending batch of transactions", "amount", amount, "size", len(txs))
sizeFactor := (len(txs) + txSizeCostLimit/2) / txSizeCostLimit sizeFactor := (len(txs) + txSizeCostLimit/2) / txSizeCostLimit
...@@ -719,6 +720,8 @@ type clientPeer struct { ...@@ -719,6 +720,8 @@ type clientPeer struct {
responseLock sync.Mutex responseLock sync.Mutex
responseCount uint64 // Counter to generate an unique id for request processing. responseCount uint64 // Counter to generate an unique id for request processing.
balance *lps.NodeBalance
// invalidLock is used for protecting invalidCount. // invalidLock is used for protecting invalidCount.
invalidLock sync.RWMutex invalidLock sync.RWMutex
invalidCount utils.LinearExpiredValue // Counter the invalid request the client peer has made. invalidCount utils.LinearExpiredValue // Counter the invalid request the client peer has made.
...@@ -876,18 +879,25 @@ func (p *clientPeer) sendAnnounce(request announceData) error { ...@@ -876,18 +879,25 @@ func (p *clientPeer) sendAnnounce(request announceData) error {
return p2p.Send(p.rw, AnnounceMsg, request) return p2p.Send(p.rw, AnnounceMsg, request)
} }
// allowInactive implements clientPoolPeer
func (p *clientPeer) allowInactive() bool {
return false
}
// updateCapacity updates the request serving capacity assigned to a given client // updateCapacity updates the request serving capacity assigned to a given client
// and also sends an announcement about the updated flow control parameters // and also sends an announcement about the updated flow control parameters
func (p *clientPeer) updateCapacity(cap uint64) { func (p *clientPeer) updateCapacity(cap uint64) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio} if cap != p.fcParams.MinRecharge {
p.fcClient.UpdateParams(p.fcParams) p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio}
var kvList keyValueList p.fcClient.UpdateParams(p.fcParams)
kvList = kvList.add("flowControl/MRR", cap) var kvList keyValueList
kvList = kvList.add("flowControl/BL", cap*bufLimitRatio) kvList = kvList.add("flowControl/MRR", cap)
p.queueSend(func() { p.sendAnnounce(announceData{Update: kvList}) }) kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
p.queueSend(func() { p.sendAnnounce(announceData{Update: kvList}) })
}
} }
// freezeClient temporarily puts the client in a frozen state which means all // freezeClient temporarily puts the client in a frozen state which means all
...@@ -974,7 +984,7 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge ...@@ -974,7 +984,7 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge
// set default announceType on server side // set default announceType on server side
p.announceType = announceTypeSimple p.announceType = announceTypeSimple
} }
p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) p.fcClient = flowcontrol.NewClientNode(server.fcManager, p.fcParams)
} }
return nil return nil
}) })
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment