Unverified Commit 2d89fe08 authored by Felföldi Zsolt's avatar Felföldi Zsolt Committed by GitHub

les: move client pool to les/vflux/server (#22495)

* les: move client pool to les/vflux/server

* les/vflux/server: un-expose NodeBalance, remove unused fn, fix bugs

* tests/fuzzers/vflux: add ClientPool fuzzer

* les/vflux/server: fixed balance tests

* les: rebase fix

* les/vflux/server: fixed more bugs

* les/vflux/server: unexported NodeStateMachine fields and flags

* les/vflux/server: unexport all internal components and functions

* les/vflux/server: fixed priorityPool test

* les/vflux/server: polish balance

* les/vflux/server: fixed mutex locking error

* les/vflux/server: priorityPool bug fixed

* common/prque: make Prque wrap-around priority handling optional

* les/vflux/server: rename funcs, small optimizations

* les/vflux/server: fixed timeUntil

* les/vflux/server: separated balance.posValue and negValue

* les/vflux/server: polish setup

* les/vflux/server: enforce capacity curve monotonicity

* les/vflux/server: simplified requestCapacity

* les/vflux/server: requestCapacity with target range, no iterations in SetCapacity

* les/vflux/server: minor changes

* les/vflux/server: moved default factors to balanceTracker

* les/vflux/server: set inactiveFlag in priorityPool

* les/vflux/server: moved related metrics to vfs package

* les/vflux/client: make priorityPool temp state logic cleaner

* les/vflux/server: changed log.Crit to log.Error

* add vflux fuzzer to oss-fuzz
Co-authored-by: 's avatarrjl493456442 <garyrong0905@gmail.com>
parent e275b1a2
......@@ -55,7 +55,7 @@ type (
// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil),
popQueue: newSstack(nil, false),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
......@@ -71,8 +71,8 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior
// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0)
q.queue[1] = newSstack(q.setIndex1)
q.queue[0] = newSstack(q.setIndex0, false)
q.queue[1] = newSstack(q.setIndex1, false)
}
// Refresh performs queue re-evaluation if necessary
......
......@@ -28,7 +28,12 @@ type Prque struct {
// New creates a new priority queue.
func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex)}
return &Prque{newSstack(setIndex, false)}
}
// NewWrapAround creates a new priority queue with wrap-around priority handling.
func NewWrapAround(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, true)}
}
// Pushes a value with a given priority into the queue, expanding if necessary.
......
......@@ -31,22 +31,24 @@ type SetIndexCallback func(data interface{}, index int)
// the stack (heap) functionality and the Len, Less and Swap methods for the
// sortability requirements of the heaps.
type sstack struct {
setIndex SetIndexCallback
size int
capacity int
offset int
setIndex SetIndexCallback
size int
capacity int
offset int
wrapAround bool
blocks [][]*item
active []*item
}
// Creates a new, empty stack.
func newSstack(setIndex SetIndexCallback) *sstack {
func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack {
result := new(sstack)
result.setIndex = setIndex
result.active = make([]*item, blockSize)
result.blocks = [][]*item{result.active}
result.capacity = blockSize
result.wrapAround = wrapAround
return result
}
......@@ -94,7 +96,11 @@ func (s *sstack) Len() int {
// Compares the priority of two elements of the stack (higher is first).
// Required by sort.Interface.
func (s *sstack) Less(i, j int) bool {
return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0
a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority
if s.wrapAround {
return a-b > 0
}
return a > b
}
// Swaps two elements in the stack. Required by sort.Interface.
......@@ -110,5 +116,5 @@ func (s *sstack) Swap(i, j int) {
// Resets the stack, effectively clearing its contents.
func (s *sstack) Reset() {
*s = *newSstack(s.setIndex)
*s = *newSstack(s.setIndex, false)
}
......@@ -21,7 +21,7 @@ func TestSstack(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
}
stack := newSstack(nil)
stack := newSstack(nil, false)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
......@@ -55,7 +55,7 @@ func TestSstackSort(t *testing.T) {
data[i] = &item{rand.Int(), int64(i)}
}
// Push all the data into the stack
stack := newSstack(nil)
stack := newSstack(nil, false)
for _, val := range data {
stack.Push(val)
}
......@@ -76,7 +76,7 @@ func TestSstackReset(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
}
stack := newSstack(nil)
stack := newSstack(nil, false)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
......
......@@ -31,7 +31,6 @@ var (
errNoCheckpoint = errors.New("no local checkpoint provided")
errNotActivated = errors.New("checkpoint registrar is not activated")
errUnknownBenchmarkType = errors.New("unknown benchmark type")
errNoPriority = errors.New("priority too low to raise capacity")
)
// PrivateLightServerAPI provides an API to access the LES light server.
......@@ -44,8 +43,8 @@ type PrivateLightServerAPI struct {
func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
return &PrivateLightServerAPI{
server: server,
defaultPosFactors: server.clientPool.defaultPosFactors,
defaultNegFactors: server.clientPool.defaultNegFactors,
defaultPosFactors: defaultPosFactors,
defaultNegFactors: defaultNegFactors,
}
}
......@@ -66,7 +65,9 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} {
res := make(map[string]interface{})
res["minimumCapacity"] = api.server.minCapacity
res["maximumCapacity"] = api.server.maxCapacity
res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo()
_, res["totalCapacity"] = api.server.clientPool.Limits()
_, res["totalConnectedCapacity"] = api.server.clientPool.Active()
res["priorityConnectedCapacity"] = 0 //TODO connect when token sale module is added
return res
}
......@@ -80,9 +81,18 @@ func (api *PrivateLightServerAPI) ClientInfo(nodes []string) map[enode.ID]map[st
}
res := make(map[enode.ID]map[string]interface{})
api.server.clientPool.forClients(ids, func(client *clientInfo) {
res[client.node.ID()] = api.clientInfo(client)
})
if len(ids) == 0 {
ids = api.server.peers.ids()
}
for _, id := range ids {
if peer := api.server.peers.peer(id); peer != nil {
res[id] = api.clientInfo(peer, peer.balance)
} else {
api.server.clientPool.BalanceOperation(id, "", func(balance vfs.AtomicBalanceOperator) {
res[id] = api.clientInfo(nil, balance)
})
}
}
return res
}
......@@ -94,31 +104,35 @@ func (api *PrivateLightServerAPI) ClientInfo(nodes []string) map[enode.ID]map[st
// assigned to it.
func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} {
res := make(map[enode.ID]map[string]interface{})
ids := api.server.clientPool.bt.GetPosBalanceIDs(start, stop, maxCount+1)
ids := api.server.clientPool.GetPosBalanceIDs(start, stop, maxCount+1)
if len(ids) > maxCount {
res[ids[maxCount]] = make(map[string]interface{})
ids = ids[:maxCount]
}
if len(ids) != 0 {
api.server.clientPool.forClients(ids, func(client *clientInfo) {
res[client.node.ID()] = api.clientInfo(client)
})
for _, id := range ids {
if peer := api.server.peers.peer(id); peer != nil {
res[id] = api.clientInfo(peer, peer.balance)
} else {
api.server.clientPool.BalanceOperation(id, "", func(balance vfs.AtomicBalanceOperator) {
res[id] = api.clientInfo(nil, balance)
})
}
}
return res
}
// clientInfo creates a client info data structure
func (api *PrivateLightServerAPI) clientInfo(c *clientInfo) map[string]interface{} {
func (api *PrivateLightServerAPI) clientInfo(peer *clientPeer, balance vfs.ReadOnlyBalance) map[string]interface{} {
info := make(map[string]interface{})
pb, nb := c.balance.GetBalance()
info["isConnected"] = c.connected
pb, nb := balance.GetBalance()
info["isConnected"] = peer != nil
info["pricing/balance"] = pb
info["priority"] = pb != 0
// cb := api.server.clientPool.ndb.getCurrencyBalance(id)
// info["pricing/currency"] = cb.amount
if c.connected {
info["connectionTime"] = float64(mclock.Now()-c.connectedAt) / float64(time.Second)
info["capacity"], _ = api.server.clientPool.ns.GetField(c.node, priorityPoolSetup.CapacityField).(uint64)
if peer != nil {
info["connectionTime"] = float64(mclock.Now()-peer.connectedAt) / float64(time.Second)
info["capacity"] = peer.getCapacity()
info["pricing/negBalance"] = nb
}
return info
......@@ -126,7 +140,7 @@ func (api *PrivateLightServerAPI) clientInfo(c *clientInfo) map[string]interface
// setParams either sets the given parameters for a single connected client (if specified)
// or the default parameters applicable to clients connected in the future
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *vfs.PriceFactors) (updateFactors bool, err error) {
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientPeer, posFactors, negFactors *vfs.PriceFactors) (updateFactors bool, err error) {
defParams := client == nil
for name, value := range params {
errValue := func() error {
......@@ -156,9 +170,8 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
setFactor(&negFactors.RequestFactor)
case !defParams && name == "capacity":
if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity {
_, err = api.server.clientPool.setCapacity(client.node, client.address, uint64(capacity), 0, true)
// Don't have to call factor update explicitly. It's already done
// in setCapacity function.
_, err = api.server.clientPool.SetCapacity(client.Node(), uint64(capacity), 0, false)
// time factor recalculation is performed automatically by the balance tracker
} else {
err = errValue()
}
......@@ -179,31 +192,25 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
// SetClientParams sets client parameters for all clients listed in the ids list
// or all connected clients if the list is empty
func (api *PrivateLightServerAPI) SetClientParams(nodes []string, params map[string]interface{}) error {
var (
ids []enode.ID
err error
)
var err error
for _, node := range nodes {
if id, err := parseNode(node); err != nil {
var id enode.ID
if id, err = parseNode(node); err != nil {
return err
} else {
ids = append(ids, id)
}
}
api.server.clientPool.forClients(ids, func(client *clientInfo) {
if client.connected {
posFactors, negFactors := client.balance.GetPriceFactors()
update, e := api.setParams(params, client, &posFactors, &negFactors)
if peer := api.server.peers.peer(id); peer != nil {
posFactors, negFactors := peer.balance.GetPriceFactors()
update, e := api.setParams(params, peer, &posFactors, &negFactors)
if update {
client.balance.SetPriceFactors(posFactors, negFactors)
peer.balance.SetPriceFactors(posFactors, negFactors)
}
if e != nil {
err = e
}
} else {
err = fmt.Errorf("client %064x is not connected", client.node.ID())
err = fmt.Errorf("client %064x is not connected", id)
}
})
}
return err
}
......@@ -211,7 +218,7 @@ func (api *PrivateLightServerAPI) SetClientParams(nodes []string, params map[str
func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{}) error {
update, err := api.setParams(params, nil, &api.defaultPosFactors, &api.defaultNegFactors)
if update {
api.server.clientPool.setDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
api.server.clientPool.SetDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
}
return err
}
......@@ -224,7 +231,7 @@ func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error {
if bias < time.Duration(0) {
return fmt.Errorf("bias illegal: %v less than 0", bias)
}
api.server.clientPool.setConnectedBias(bias)
api.server.clientPool.SetConnectedBias(bias)
return nil
}
......@@ -235,8 +242,8 @@ func (api *PrivateLightServerAPI) AddBalance(node string, amount int64) (balance
if id, err = parseNode(node); err != nil {
return
}
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
balance[0], balance[1], err = c.balance.AddBalance(amount)
api.server.clientPool.BalanceOperation(id, "", func(nb vfs.AtomicBalanceOperator) {
balance[0], balance[1], err = nb.AddBalance(amount)
})
return
}
......@@ -338,14 +345,12 @@ func (api *PrivateDebugAPI) FreezeClient(node string) error {
if id, err = parseNode(node); err != nil {
return err
}
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
if c.connected {
c.peer.freeze()
} else {
err = fmt.Errorf("client %064x is not connected", id[:])
}
})
return err
if peer := api.server.peers.peer(id); peer != nil {
peer.freeze()
return nil
} else {
return fmt.Errorf("client %064x is not connected", id[:])
}
}
// PrivateLightAPI provides an API to access the LES light server or light client.
......
This diff is collapsed.
......@@ -108,7 +108,7 @@ type ClientManager struct {
func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager {
cm := &ClientManager{
clock: clock,
rcQueue: prque.New(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }),
rcQueue: prque.NewWrapAround(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }),
capLastUpdate: clock.Now(),
stop: make(chan chan struct{}),
}
......
......@@ -73,12 +73,9 @@ var (
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
capacityQueryZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryZero", nil)
capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryNonZero", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
......@@ -100,12 +97,8 @@ var (
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil)
clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil)
clientActivatedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/activated", nil)
clientDeactivatedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/deactivated", nil)
clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil)
clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
requestRTT = metrics.NewRegisteredTimer("les/client/req/rtt", nil)
requestSendDelay = metrics.NewRegisteredTimer("les/client/req/sendDelay", nil)
......
......@@ -17,6 +17,7 @@
package les
import (
"crypto/ecdsa"
"errors"
"fmt"
"math/big"
......@@ -37,6 +38,7 @@ import (
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -762,15 +764,22 @@ type clientPeer struct {
responseLock sync.Mutex
responseCount uint64 // Counter to generate an unique id for request processing.
balance *vfs.NodeBalance
balance vfs.ConnectedBalance
// invalidLock is used for protecting invalidCount.
invalidLock sync.RWMutex
invalidCount utils.LinearExpiredValue // Counter the invalid request the client peer has made.
server bool
errCh chan error
fcClient *flowcontrol.ClientNode // Server side mirror token bucket.
capacity uint64
// lastAnnounce is the last broadcast created by the server; may be newer than the last head
// sent to the specific client (stored in headInfo) if capacity is zero. In this case the
// latest head is sent when the client gains non-zero capacity.
lastAnnounce announceData
connectedAt mclock.AbsTime
server bool
errCh chan error
fcClient *flowcontrol.ClientNode // Server side mirror token bucket.
}
func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer {
......@@ -789,9 +798,9 @@ func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWrite
}
}
// freeClientId returns a string identifier for the peer. Multiple peers with
// FreeClientId returns a string identifier for the peer. Multiple peers with
// the same identifier can not be connected in free mode simultaneously.
func (p *clientPeer) freeClientId() string {
func (p *clientPeer) FreeClientId() string {
if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
if addr.IP.IsLoopback() {
// using peer id instead of loopback ip address allows multiple free
......@@ -921,25 +930,69 @@ func (p *clientPeer) sendAnnounce(request announceData) error {
return p2p.Send(p.rw, AnnounceMsg, request)
}
// allowInactive implements clientPoolPeer
func (p *clientPeer) allowInactive() bool {
return false
// InactiveAllowance implements vfs.clientPeer
func (p *clientPeer) InactiveAllowance() time.Duration {
return 0 // will return more than zero for les/5 clients
}
// getCapacity returns the current capacity of the peer
func (p *clientPeer) getCapacity() uint64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.capacity
}
// updateCapacity updates the request serving capacity assigned to a given client
// and also sends an announcement about the updated flow control parameters
func (p *clientPeer) updateCapacity(cap uint64) {
// UpdateCapacity updates the request serving capacity assigned to a given client
// and also sends an announcement about the updated flow control parameters.
// Note: UpdateCapacity implements vfs.clientPeer and should not block. The requested
// parameter is true if the callback was initiated by ClientPool.SetCapacity on the given peer.
func (p *clientPeer) UpdateCapacity(newCap uint64, requested bool) {
p.lock.Lock()
defer p.lock.Unlock()
if cap != p.fcParams.MinRecharge {
p.fcParams = flowcontrol.ServerParams{MinRecharge: cap, BufLimit: cap * bufLimitRatio}
if newCap != p.fcParams.MinRecharge {
p.fcParams = flowcontrol.ServerParams{MinRecharge: newCap, BufLimit: newCap * bufLimitRatio}
p.fcClient.UpdateParams(p.fcParams)
var kvList keyValueList
kvList = kvList.add("flowControl/MRR", cap)
kvList = kvList.add("flowControl/BL", cap*bufLimitRatio)
kvList = kvList.add("flowControl/MRR", newCap)
kvList = kvList.add("flowControl/BL", newCap*bufLimitRatio)
p.queueSend(func() { p.sendAnnounce(announceData{Update: kvList}) })
}
if p.capacity == 0 && newCap != 0 {
p.sendLastAnnounce()
}
p.capacity = newCap
}
// announceOrStore sends the given head announcement to the client if the client is
// active (capacity != 0) and the same announcement hasn't been sent before. If the
// client is inactive the announcement is stored and sent later if the client is
// activated again.
func (p *clientPeer) announceOrStore(announce announceData) {
p.lock.Lock()
defer p.lock.Unlock()
p.lastAnnounce = announce
if p.capacity != 0 {
p.sendLastAnnounce()
}
}
// announce sends the given head announcement to the client if it hasn't been sent before
func (p *clientPeer) sendLastAnnounce() {
if p.lastAnnounce.Td == nil {
return
}
if p.headInfo.Td == nil || p.lastAnnounce.Td.Cmp(p.headInfo.Td) > 0 {
if !p.queueSend(func() { p.sendAnnounce(p.lastAnnounce) }) {
p.Log().Debug("Dropped announcement because queue is full", "number", p.lastAnnounce.Number, "hash", p.lastAnnounce.Hash)
} else {
p.Log().Debug("Sent announcement", "number", p.lastAnnounce.Number, "hash", p.lastAnnounce.Hash)
}
p.headInfo = blockInfo{Hash: p.lastAnnounce.Hash, Number: p.lastAnnounce.Number, Td: p.lastAnnounce.Td}
}
}
// freezeClient temporarily puts the client in a frozen state which means all
......@@ -1064,6 +1117,11 @@ func (p *clientPeer) getInvalid() uint64 {
return p.invalidCount.Value(mclock.Now())
}
// Disconnect implements vfs.clientPeer
func (p *clientPeer) Disconnect() {
p.Peer.Disconnect(p2p.DiscRequested)
}
// serverPeerSubscriber is an interface to notify services about added or
// removed server peers
type serverPeerSubscriber interface {
......@@ -1221,3 +1279,181 @@ func (ps *serverPeerSet) close() {
}
ps.closed = true
}
// clientPeerSet represents the set of active client peers currently
// participating in the Light Ethereum sub-protocol.
type clientPeerSet struct {
peers map[enode.ID]*clientPeer
lock sync.RWMutex
closed bool
privateKey *ecdsa.PrivateKey
lastAnnounce, signedAnnounce announceData
}
// newClientPeerSet creates a new peer set to track the client peers.
func newClientPeerSet() *clientPeerSet {
return &clientPeerSet{peers: make(map[enode.ID]*clientPeer)}
}
// register adds a new peer into the peer set, or returns an error if the
// peer is already known.
func (ps *clientPeerSet) register(peer *clientPeer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if ps.closed {
return errClosed
}
if _, exist := ps.peers[peer.ID()]; exist {
return errAlreadyRegistered
}
ps.peers[peer.ID()] = peer
ps.announceOrStore(peer)
return nil
}
// unregister removes a remote peer from the peer set, disabling any further
// actions to/from that particular entity. It also initiates disconnection
// at the networking layer.
func (ps *clientPeerSet) unregister(id enode.ID) error {
ps.lock.Lock()
defer ps.lock.Unlock()
p, ok := ps.peers[id]
if !ok {
return errNotRegistered
}
delete(ps.peers, id)
p.Peer.Disconnect(p2p.DiscRequested)
return nil
}
// ids returns a list of all registered peer IDs
func (ps *clientPeerSet) ids() []enode.ID {
ps.lock.RLock()
defer ps.lock.RUnlock()
var ids []enode.ID
for id := range ps.peers {
ids = append(ids, id)
}
return ids
}
// peer retrieves the registered peer with the given id.
func (ps *clientPeerSet) peer(id enode.ID) *clientPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()
return ps.peers[id]
}
// len returns if the current number of peers in the set.
func (ps *clientPeerSet) len() int {
ps.lock.RLock()
defer ps.lock.RUnlock()
return len(ps.peers)
}
// setSignerKey sets the signer key for signed announcements. Should be called before
// starting the protocol handler.
func (ps *clientPeerSet) setSignerKey(privateKey *ecdsa.PrivateKey) {
ps.privateKey = privateKey
}
// broadcast sends the given announcements to all active peers
func (ps *clientPeerSet) broadcast(announce announceData) {
ps.lock.Lock()
defer ps.lock.Unlock()
ps.lastAnnounce = announce
for _, peer := range ps.peers {
ps.announceOrStore(peer)
}
}
// announceOrStore sends the requested type of announcement to the given peer or stores
// it for later if the peer is inactive (capacity == 0).
func (ps *clientPeerSet) announceOrStore(p *clientPeer) {
if ps.lastAnnounce.Td == nil {
return
}
switch p.announceType {
case announceTypeSimple:
p.announceOrStore(ps.lastAnnounce)
case announceTypeSigned:
if ps.signedAnnounce.Hash != ps.lastAnnounce.Hash {
ps.signedAnnounce = ps.lastAnnounce
ps.signedAnnounce.sign(ps.privateKey)
}
p.announceOrStore(ps.signedAnnounce)
}
}
// close disconnects all peers. No new peers can be registered
// after close has returned.
func (ps *clientPeerSet) close() {
ps.lock.Lock()
defer ps.lock.Unlock()
for _, p := range ps.peers {
p.Peer.Disconnect(p2p.DiscQuitting)
}
ps.closed = true
}
// serverSet is a special set which contains all connected les servers.
// Les servers will also be discovered by discovery protocol because they
// also run the LES protocol. We can't drop them although they are useless
// for us(server) but for other protocols(e.g. ETH) upon the devp2p they
// may be useful.
type serverSet struct {
lock sync.Mutex
set map[string]*clientPeer
closed bool
}
func newServerSet() *serverSet {
return &serverSet{set: make(map[string]*clientPeer)}
}
func (s *serverSet) register(peer *clientPeer) error {
s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return errClosed
}
if _, exist := s.set[peer.id]; exist {
return errAlreadyRegistered
}
s.set[peer.id] = peer
return nil
}
func (s *serverSet) unregister(peer *clientPeer) error {
s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return errClosed
}
if _, exist := s.set[peer.id]; !exist {
return errNotRegistered
}
delete(s.set, peer.id)
peer.Peer.Disconnect(p2p.DiscQuitting)
return nil
}
func (s *serverSet) close() {
s.lock.Lock()
defer s.lock.Unlock()
for _, p := range s.set {
p.Peer.Disconnect(p2p.DiscQuitting)
}
s.closed = true
}
......@@ -18,7 +18,6 @@ package les
import (
"crypto/ecdsa"
"reflect"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
......@@ -26,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
......@@ -34,24 +32,16 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
var (
serverSetup = &nodestate.Setup{}
clientPeerField = serverSetup.NewField("clientPeer", reflect.TypeOf(&clientPeer{}))
clientInfoField = serverSetup.NewField("clientInfo", reflect.TypeOf(&clientInfo{}))
connAddressField = serverSetup.NewField("connAddr", reflect.TypeOf(""))
balanceTrackerSetup = vfs.NewBalanceTrackerSetup(serverSetup)
priorityPoolSetup = vfs.NewPriorityPoolSetup(serverSetup)
defaultPosFactors = vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}
defaultNegFactors = vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}
)
func init() {
balanceTrackerSetup.Connect(connAddressField, priorityPoolSetup.CapacityField)
priorityPoolSetup.Connect(balanceTrackerSetup.BalanceField, balanceTrackerSetup.UpdateFlag) // NodeBalance implements nodePriority
}
const defaultConnectedBias = time.Minute * 3
type ethBackend interface {
ArchiveMode() bool
......@@ -65,10 +55,10 @@ type ethBackend interface {
type LesServer struct {
lesCommons
ns *nodestate.NodeStateMachine
archiveMode bool // Flag whether the ethereum node runs in archive mode.
handler *serverHandler
broadcaster *broadcaster
peers *clientPeerSet
serverset *serverSet
vfluxServer *vfs.Server
privateKey *ecdsa.PrivateKey
......@@ -77,7 +67,7 @@ type LesServer struct {
costTracker *costTracker
defParams flowcontrol.ServerParams
servingQueue *servingQueue
clientPool *clientPool
clientPool *vfs.ClientPool
minCapacity, maxCapacity uint64
threadsIdle int // Request serving threads count when system is idle.
......@@ -91,7 +81,6 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
if err != nil {
return nil, err
}
ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
// Calculate the number of threads used to service the light client
// requests based on the user-specified value.
threads := config.LightServ * 4 / 100
......@@ -111,9 +100,9 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
closeCh: make(chan struct{}),
},
ns: ns,
archiveMode: e.ArchiveMode(),
broadcaster: newBroadcaster(ns),
peers: newClientPeerSet(),
serverset: newServerSet(),
vfluxServer: vfs.NewServer(time.Millisecond * 10),
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
......@@ -121,7 +110,6 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
threadsIdle: threads,
p2pSrv: node.Server(),
}
srv.vfluxServer.Register(srv)
issync := e.Synced
if config.LightNoSyncServe {
issync = func() bool { return true }
......@@ -149,8 +137,10 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
srv.maxCapacity = totalRecharge
}
srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync)
srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
srv.clientPool = vfs.NewClientPool(lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, issync)
srv.clientPool.Start()
srv.clientPool.SetDefaultFactors(defaultPosFactors, defaultNegFactors)
srv.vfluxServer.Register(srv.clientPool, "les", "Ethereum light client service")
checkpoint := srv.latestLocalCheckpoint()
if !checkpoint.Empty() {
......@@ -162,14 +152,6 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
node.RegisterProtocols(srv.Protocols())
node.RegisterAPIs(srv.APIs())
node.RegisterLifecycle(srv)
// disconnect all peers at nsm shutdown
ns.SubscribeField(clientPeerField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
if state.Equals(serverSetup.OfflineFlag()) && oldValue != nil {
oldValue.(*clientPeer).Peer.Disconnect(p2p.DiscRequested)
}
})
ns.Start()
return srv, nil
}
......@@ -198,7 +180,7 @@ func (s *LesServer) APIs() []rpc.API {
func (s *LesServer) Protocols() []p2p.Protocol {
ps := s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} {
if p := s.getClient(id); p != nil {
if p := s.peers.peer(id); p != nil {
return p.Info()
}
return nil
......@@ -215,7 +197,7 @@ func (s *LesServer) Protocols() []p2p.Protocol {
// Start starts the LES server
func (s *LesServer) Start() error {
s.privateKey = s.p2pSrv.PrivateKey
s.broadcaster.setSignerKey(s.privateKey)
s.peers.setSignerKey(s.privateKey)
s.handler.start()
s.wg.Add(1)
go s.capacityManagement()
......@@ -229,8 +211,9 @@ func (s *LesServer) Start() error {
func (s *LesServer) Stop() error {
close(s.closeCh)
s.clientPool.stop()
s.ns.Stop()
s.clientPool.Stop()
s.serverset.close()
s.peers.close()
s.fcManager.Stop()
s.costTracker.stop()
s.handler.stop()
......@@ -261,7 +244,7 @@ func (s *LesServer) capacityManagement() {
totalCapacityCh := make(chan uint64, 100)
totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
s.clientPool.SetLimits(uint64(s.config.LightPeers), totalCapacity)
var (
busy bool
......@@ -298,39 +281,9 @@ func (s *LesServer) capacityManagement() {
log.Warn("Reduced free peer connections", "from", freePeers, "to", newFreePeers)
}
freePeers = newFreePeers
s.clientPool.setLimits(s.config.LightPeers, totalCapacity)
s.clientPool.SetLimits(uint64(s.config.LightPeers), totalCapacity)
case <-s.closeCh:
return
}
}
}
func (s *LesServer) getClient(id enode.ID) *clientPeer {
if node := s.ns.GetNode(id); node != nil {
if p, ok := s.ns.GetField(node, clientPeerField).(*clientPeer); ok {
return p
}
}
return nil
}
func (s *LesServer) dropClient(id enode.ID) {
if p := s.getClient(id); p != nil {
p.Peer.Disconnect(p2p.DiscRequested)
}
}
// ServiceInfo implements vfs.Service
func (s *LesServer) ServiceInfo() (string, string) {
return "les", "Ethereum light client service"
}
// Handle implements vfs.Service
func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte {
switch name {
case vflux.CapacityQueryName:
return s.clientPool.serveCapQuery(id, address, data)
default:
return nil
}
}
......@@ -17,7 +17,6 @@
package les
import (
"crypto/ecdsa"
"errors"
"sync"
"sync/atomic"
......@@ -31,13 +30,10 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
......@@ -59,7 +55,6 @@ const (
var (
errTooManyInvalidRequest = errors.New("too many invalid requests made")
errFullClientPool = errors.New("client pool is full")
)
// serverHandler is responsible for serving light client and process
......@@ -128,32 +123,18 @@ func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
// Reject the duplicated peer, otherwise register it to peerset.
var registered bool
if err := h.server.ns.Operation(func() {
if h.server.ns.GetField(p.Node(), clientPeerField) != nil {
registered = true
} else {
h.server.ns.SetFieldSub(p.Node(), clientPeerField, p)
}
}); err != nil {
return err
}
if registered {
return errAlreadyRegistered
}
defer func() {
h.server.ns.SetField(p.Node(), clientPeerField, nil)
if p.fcClient != nil { // is nil when connecting another server
p.fcClient.Disconnect()
}
}()
if p.server {
if err := h.server.serverset.register(p); err != nil {
return err
}
// connected to another server, no messages expected, just wait for disconnection
_, err := p.rw.ReadMsg()
h.server.serverset.unregister(p)
return err
}
defer p.fcClient.Disconnect() // set by handshake if it's not another server
// Reject light clients if server is not synced.
//
// Put this checking here, so that "non-synced" les-server peers are still allowed
......@@ -162,30 +143,31 @@ func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Light server not synced, rejecting peer")
return p2p.DiscRequested
}
// Disconnect the inbound peer if it's rejected by clientPool
if cap, err := h.server.clientPool.connect(p); cap != p.fcParams.MinRecharge || err != nil {
p.Log().Debug("Light Ethereum peer rejected", "err", errFullClientPool)
return errFullClientPool
if err := h.server.peers.register(p); err != nil {
return err
}
p.balance, _ = h.server.ns.GetField(p.Node(), h.server.clientPool.BalanceField).(*vfs.NodeBalance)
if p.balance == nil {
if p.balance = h.server.clientPool.Register(p); p.balance == nil {
h.server.peers.unregister(p.ID())
p.Log().Debug("Client pool already closed")
return p2p.DiscRequested
}
activeCount, _ := h.server.clientPool.pp.Active()
activeCount, _ := h.server.clientPool.Active()
clientConnectionGauge.Update(int64(activeCount))
p.connectedAt = mclock.Now()
var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
connectedAt := mclock.Now()
defer func() {
wg.Wait() // Ensure all background task routines have exited.
h.server.clientPool.disconnect(p)
h.server.clientPool.Unregister(p)
h.server.peers.unregister(p.ID())
p.balance = nil
activeCount, _ := h.server.clientPool.pp.Active()
activeCount, _ := h.server.clientPool.Active()
clientConnectionGauge.Update(int64(activeCount))
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
connectionTimer.Update(time.Duration(mclock.Now() - p.connectedAt))
}()
// Mark the peer starts to be served.
// Mark the peer as being served.
atomic.StoreUint32(&p.serving, 1)
defer atomic.StoreUint32(&p.serving, 0)
......@@ -448,78 +430,9 @@ func (h *serverHandler) broadcastLoop() {
}
lastHead, lastTd = header, td
log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg)
h.server.broadcaster.broadcast(announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg})
h.server.peers.broadcast(announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg})
case <-h.closeCh:
return
}
}
}
// broadcaster sends new header announcements to active client peers
type broadcaster struct {
ns *nodestate.NodeStateMachine
privateKey *ecdsa.PrivateKey
lastAnnounce, signedAnnounce announceData
}
// newBroadcaster creates a new broadcaster
func newBroadcaster(ns *nodestate.NodeStateMachine) *broadcaster {
b := &broadcaster{ns: ns}
ns.SubscribeState(priorityPoolSetup.ActiveFlag, func(node *enode.Node, oldState, newState nodestate.Flags) {
if newState.Equals(priorityPoolSetup.ActiveFlag) {
// send last announcement to activated peers
b.sendTo(node)
}
})
return b
}
// setSignerKey sets the signer key for signed announcements. Should be called before
// starting the protocol handler.
func (b *broadcaster) setSignerKey(privateKey *ecdsa.PrivateKey) {
b.privateKey = privateKey
}
// broadcast sends the given announcements to all active peers
func (b *broadcaster) broadcast(announce announceData) {
b.ns.Operation(func() {
// iterate in an Operation to ensure that the active set does not change while iterating
b.lastAnnounce = announce
b.ns.ForEach(priorityPoolSetup.ActiveFlag, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
b.sendTo(node)
})
})
}
// sendTo sends the most recent announcement to the given node unless the same or higher Td
// announcement has already been sent.
func (b *broadcaster) sendTo(node *enode.Node) {
if b.lastAnnounce.Td == nil {
return
}
if p, _ := b.ns.GetField(node, clientPeerField).(*clientPeer); p != nil {
if p.headInfo.Td == nil || b.lastAnnounce.Td.Cmp(p.headInfo.Td) > 0 {
announce := b.lastAnnounce
switch p.announceType {
case announceTypeSimple:
if !p.queueSend(func() { p.sendAnnounce(announce) }) {
log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
} else {
log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
}
case announceTypeSigned:
if b.signedAnnounce.Hash != b.lastAnnounce.Hash {
b.signedAnnounce = b.lastAnnounce
b.signedAnnounce.sign(b.privateKey)
}
announce := b.signedAnnounce
if !p.queueSend(func() { p.sendAnnounce(announce) }) {
log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
} else {
log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
}
}
p.headInfo = blockInfo{b.lastAnnounce.Hash, b.lastAnnounce.Number, b.lastAnnounce.Td}
}
}
}
......@@ -123,7 +123,7 @@ func (t *servingTask) waitOrStop() bool {
// newServingQueue returns a new servingQueue
func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
sq := &servingQueue{
queue: prque.New(nil),
queue: prque.NewWrapAround(nil),
suspendBias: suspendBias,
queueAddCh: make(chan *servingTask, 100),
queueBestCh: make(chan *servingTask),
......@@ -279,7 +279,7 @@ func (sq *servingQueue) updateRecentTime() {
func (sq *servingQueue) addTask(task *servingTask) {
if sq.best == nil {
sq.best = task
} else if task.priority > sq.best.priority {
} else if task.priority-sq.best.priority > 0 {
sq.queue.Push(sq.best, sq.best.priority)
sq.best = task
} else {
......
......@@ -45,10 +45,10 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/les/checkpointoracle"
"github.com/ethereum/go-ethereum/les/flowcontrol"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/params"
)
......@@ -284,7 +284,6 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
}
oracle = checkpointoracle.New(checkpointConfig, getLocal)
}
ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
server := &LesServer{
lesCommons: lesCommons{
genesis: genesis.Hash(),
......@@ -296,8 +295,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
oracle: oracle,
closeCh: make(chan struct{}),
},
ns: ns,
broadcaster: newBroadcaster(ns),
peers: newClientPeerSet(),
servingQueue: newServingQueue(int64(time.Millisecond*10), 1),
defParams: flowcontrol.ServerParams{
BufLimit: testBufLimit,
......@@ -307,14 +305,14 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
}
server.costTracker, server.minCapacity = newCostTracker(db, server.config)
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}, alwaysTrueFn)
server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
server.clientPool = vfs.NewClientPool(db, testBufRecharge, defaultConnectedBias, clock, alwaysTrueFn)
server.clientPool.Start()
server.clientPool.SetLimits(10000, 10000) // Assign enough capacity for clientpool
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil {
server.oracle.Start(simulation)
}
server.servingQueue.setThreads(4)
ns.Start()
server.handler.start()
return server.handler, simulation
}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"github.com/ethereum/go-ethereum/metrics"
)
var (
totalConnectedGauge = metrics.NewRegisteredGauge("vflux/server/totalConnected", nil)
clientConnectedMeter = metrics.NewRegisteredMeter("vflux/server/clientEvent/connected", nil)
clientActivatedMeter = metrics.NewRegisteredMeter("vflux/server/clientEvent/activated", nil)
clientDeactivatedMeter = metrics.NewRegisteredMeter("vflux/server/clientEvent/deactivated", nil)
clientDisconnectedMeter = metrics.NewRegisteredMeter("vflux/server/clientEvent/disconnected", nil)
capacityQueryZeroMeter = metrics.NewRegisteredMeter("vflux/server/capQueryZero", nil)
capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("vflux/server/capQueryNonZero", nil)
)
This diff is collapsed.
......@@ -28,18 +28,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nodestate"
)
var (
testSetup = &nodestate.Setup{}
ppTestClientFlag = testSetup.NewFlag("ppTestClientFlag")
ppTestClientField = testSetup.NewField("ppTestClient", reflect.TypeOf(&ppTestClient{}))
ppUpdateFlag = testSetup.NewFlag("ppUpdateFlag")
ppTestSetup = NewPriorityPoolSetup(testSetup)
)
func init() {
ppTestSetup.Connect(ppTestClientField, ppUpdateFlag)
}
const (
testCapacityStepDiv = 100
testCapacityToleranceDiv = 10
......@@ -51,25 +39,27 @@ type ppTestClient struct {
balance, cap uint64
}
func (c *ppTestClient) Priority(cap uint64) int64 {
func (c *ppTestClient) priority(cap uint64) int64 {
return int64(c.balance / cap)
}
func (c *ppTestClient) EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
func (c *ppTestClient) estimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
return int64(c.balance / cap)
}
func TestPriorityPool(t *testing.T) {
clock := &mclock.Simulated{}
ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
setup := newServerSetup()
setup.balanceField = setup.setup.NewField("ppTestClient", reflect.TypeOf(&ppTestClient{}))
ns := nodestate.NewNodeStateMachine(nil, nil, clock, setup.setup)
ns.SubscribeField(ppTestSetup.CapacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
if n := ns.GetField(node, ppTestSetup.priorityField); n != nil {
ns.SubscribeField(setup.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
if n := ns.GetField(node, setup.balanceField); n != nil {
c := n.(*ppTestClient)
c.cap = newValue.(uint64)
}
})
pp := NewPriorityPool(ns, ppTestSetup, clock, testMinCap, 0, testCapacityStepDiv)
pp := newPriorityPool(ns, setup, clock, testMinCap, 0, testCapacityStepDiv, testCapacityStepDiv)
ns.Start()
pp.SetLimits(100, 1000000)
clients := make([]*ppTestClient, 100)
......@@ -77,7 +67,8 @@ func TestPriorityPool(t *testing.T) {
for {
var ok bool
ns.Operation(func() {
_, ok = pp.RequestCapacity(c.node, c.cap+c.cap/testCapacityStepDiv, 0, true)
newCap := c.cap + c.cap/testCapacityStepDiv
ok = pp.requestCapacity(c.node, newCap, newCap, 0) == newCap
})
if !ok {
return
......@@ -101,9 +92,8 @@ func TestPriorityPool(t *testing.T) {
}
sumBalance += c.balance
clients[i] = c
ns.SetState(c.node, ppTestClientFlag, nodestate.Flags{}, 0)
ns.SetField(c.node, ppTestSetup.priorityField, c)
ns.SetState(c.node, ppTestSetup.InactiveFlag, nodestate.Flags{}, 0)
ns.SetField(c.node, setup.balanceField, c)
ns.SetState(c.node, setup.inactiveFlag, nodestate.Flags{}, 0)
raise(c)
check(c)
}
......@@ -113,8 +103,8 @@ func TestPriorityPool(t *testing.T) {
oldBalance := c.balance
c.balance = uint64(rand.Int63n(100000000000) + 100000000000)
sumBalance += c.balance - oldBalance
pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
pp.ns.SetState(c.node, setup.updateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, setup.updateFlag, 0)
if c.balance > oldBalance {
raise(c)
} else {
......@@ -129,32 +119,28 @@ func TestPriorityPool(t *testing.T) {
if count%10 == 0 {
// test available capacity calculation with capacity curve
c = clients[rand.Intn(len(clients))]
curve := pp.GetCapacityCurve().Exclude(c.node.ID())
curve := pp.getCapacityCurve().exclude(c.node.ID())
add := uint64(rand.Int63n(10000000000000))
c.balance += add
sumBalance += add
expCap := curve.MaxCapacity(func(cap uint64) int64 {
expCap := curve.maxCapacity(func(cap uint64) int64 {
return int64(c.balance / cap)
})
//fmt.Println(expCap, c.balance, sumBalance)
/*for i, cp := range curve.points {
fmt.Println("cp", i, cp, "ex", curve.getPoint(i))
}*/
var ok bool
expFail := expCap + 1
expFail := expCap + 10
if expFail < testMinCap {
expFail = testMinCap
}
ns.Operation(func() {
_, ok = pp.RequestCapacity(c.node, expFail, 0, true)
ok = pp.requestCapacity(c.node, expFail, expFail, 0) == expFail
})
if ok {
t.Errorf("Request for more than expected available capacity succeeded")
}
if expCap >= testMinCap {
ns.Operation(func() {
_, ok = pp.RequestCapacity(c.node, expCap, 0, true)
ok = pp.requestCapacity(c.node, expCap, expCap, 0) == expCap
})
if !ok {
t.Errorf("Request for expected available capacity failed")
......@@ -162,8 +148,8 @@ func TestPriorityPool(t *testing.T) {
}
c.balance -= add
sumBalance -= add
pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
pp.ns.SetState(c.node, setup.updateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, setup.updateFlag, 0)
for _, c := range clients {
raise(c)
}
......@@ -175,8 +161,11 @@ func TestPriorityPool(t *testing.T) {
func TestCapacityCurve(t *testing.T) {
clock := &mclock.Simulated{}
ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
pp := NewPriorityPool(ns, ppTestSetup, clock, 400000, 0, 2)
setup := newServerSetup()
setup.balanceField = setup.setup.NewField("ppTestClient", reflect.TypeOf(&ppTestClient{}))
ns := nodestate.NewNodeStateMachine(nil, nil, clock, setup.setup)
pp := newPriorityPool(ns, setup, clock, 400000, 0, 2, 2)
ns.Start()
pp.SetLimits(10, 10000000)
clients := make([]*ppTestClient, 10)
......@@ -188,17 +177,16 @@ func TestCapacityCurve(t *testing.T) {
cap: 1000000,
}
clients[i] = c
ns.SetState(c.node, ppTestClientFlag, nodestate.Flags{}, 0)
ns.SetField(c.node, ppTestSetup.priorityField, c)
ns.SetState(c.node, ppTestSetup.InactiveFlag, nodestate.Flags{}, 0)
ns.SetField(c.node, setup.balanceField, c)
ns.SetState(c.node, setup.inactiveFlag, nodestate.Flags{}, 0)
ns.Operation(func() {
pp.RequestCapacity(c.node, c.cap, 0, true)
pp.requestCapacity(c.node, c.cap, c.cap, 0)
})
}
curve := pp.GetCapacityCurve()
curve := pp.getCapacityCurve()
check := func(balance, expCap uint64) {
cap := curve.MaxCapacity(func(cap uint64) int64 {
cap := curve.maxCapacity(func(cap uint64) int64 {
return int64(balance / cap)
})
var fail bool
......@@ -226,7 +214,7 @@ func TestCapacityCurve(t *testing.T) {
check(1000000000000, 2500000)
pp.SetLimits(11, 10000000)
curve = pp.GetCapacityCurve()
curve = pp.getCapacityCurve()
check(0, 0)
check(10000000000, 100000)
......
......@@ -40,7 +40,6 @@ type (
// Service is a service registered at the Server and identified by a string id
Service interface {
ServiceInfo() (id, desc string) // only called during registration
Handle(id enode.ID, address string, name string, data []byte) []byte // never called concurrently
}
......@@ -60,9 +59,8 @@ func NewServer(delayPerRequest time.Duration) *Server {
}
// Register registers a Service
func (s *Server) Register(b Service) {
srv := &serviceEntry{backend: b}
srv.id, srv.desc = b.ServiceInfo()
func (s *Server) Register(b Service, id, desc string) {
srv := &serviceEntry{backend: b, id: id, desc: desc}
if strings.Contains(srv.id, ":") {
// srv.id + ":" will be used as a service database prefix
log.Error("Service ID contains ':'", "id", srv.id)
......
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"reflect"
"github.com/ethereum/go-ethereum/p2p/nodestate"
)
type peerWrapper struct{ clientPeer } // the NodeStateMachine type system needs this wrapper
// serverSetup is a wrapper of the node state machine setup, which contains
// all the created flags and fields used in the vflux server side.
type serverSetup struct {
setup *nodestate.Setup
clientField nodestate.Field // Field contains the client peer handler
// Flags and fields controlled by balance tracker. BalanceTracker
// is responsible for setting/deleting these flags or fields.
priorityFlag nodestate.Flags // Flag is set if the node has a positive balance
updateFlag nodestate.Flags // Flag is set whenever the node balance is changed(priority changed)
balanceField nodestate.Field // Field contains the client balance for priority calculation
// Flags and fields controlled by priority queue. Priority queue
// is responsible for setting/deleting these flags or fields.
activeFlag nodestate.Flags // Flag is set if the node is active
inactiveFlag nodestate.Flags // Flag is set if the node is inactive
capacityField nodestate.Field // Field contains the capacity of the node
queueField nodestate.Field // Field contains the infomration in the priority queue
}
// newServerSetup initializes the setup for state machine and returns the flags/fields group.
func newServerSetup() *serverSetup {
setup := &serverSetup{setup: &nodestate.Setup{}}
setup.clientField = setup.setup.NewField("client", reflect.TypeOf(peerWrapper{}))
setup.priorityFlag = setup.setup.NewFlag("priority")
setup.updateFlag = setup.setup.NewFlag("update")
setup.balanceField = setup.setup.NewField("balance", reflect.TypeOf(&nodeBalance{}))
setup.activeFlag = setup.setup.NewFlag("active")
setup.inactiveFlag = setup.setup.NewFlag("inactive")
setup.capacityField = setup.setup.NewField("capacity", reflect.TypeOf(uint64(0)))
setup.queueField = setup.setup.NewField("queue", reflect.TypeOf(&ppNodeInfo{}))
return setup
}
......@@ -102,6 +102,7 @@ compile_fuzzer tests/fuzzers/stacktrie Fuzz fuzzStackTrie
compile_fuzzer tests/fuzzers/difficulty Fuzz fuzzDifficulty
compile_fuzzer tests/fuzzers/abi Fuzz fuzzAbi
compile_fuzzer tests/fuzzers/les Fuzz fuzzLes
compile_fuzzer tests/fuzzers/vflux FuzzClientPool fuzzClientPool
compile_fuzzer tests/fuzzers/bls12381 FuzzG1Add fuzz_g1_add
compile_fuzzer tests/fuzzers/bls12381 FuzzG1Mul fuzz_g1_mul
......
......@@ -858,6 +858,23 @@ func (ns *NodeStateMachine) GetField(n *enode.Node, field Field) interface{} {
return nil
}
// GetState retrieves the current state of the given node. Note that when used in a
// subscription callback the result can be out of sync with the state change represented
// by the callback parameters so extra safety checks might be necessary.
func (ns *NodeStateMachine) GetState(n *enode.Node) Flags {
ns.lock.Lock()
defer ns.lock.Unlock()
ns.checkStarted()
if ns.closed {
return Flags{}
}
if _, node := ns.updateEnode(n); node != nil {
return Flags{mask: node.state, setup: ns.setup}
}
return Flags{}
}
// SetField sets the given field of the given node and blocks until the operation is finished
func (ns *NodeStateMachine) SetField(n *enode.Node, field Field, value interface{}) error {
ns.lock.Lock()
......
This diff is collapsed.
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"fmt"
"io/ioutil"
"os"
"github.com/ethereum/go-ethereum/tests/fuzzers/vflux"
)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: debug <file>\n")
fmt.Fprintf(os.Stderr, "Example\n")
fmt.Fprintf(os.Stderr, " $ debug ../crashers/4bbef6857c733a87ecf6fd8b9e7238f65eb9862a\n")
os.Exit(1)
}
crasher := os.Args[1]
data, err := ioutil.ReadFile(crasher)
if err != nil {
fmt.Fprintf(os.Stderr, "error loading crasher %v: %v", crasher, err)
os.Exit(1)
}
vflux.FuzzClientPool(data)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment