Unverified Commit dc109cce authored by Felföldi Zsolt's avatar Felföldi Zsolt Committed by GitHub

les: move server pool to les/vflux/client (#22377)

* les: move serverPool to les/vflux/client

* les: add metrics

* les: moved ValueTracker inside ServerPool

* les: protect against node registration before server pool is started

* les/vflux/client: fixed tests

* les: make peer registration safe
parent de9465f9
......@@ -33,7 +33,7 @@ var Modules = map[string]string{
"swarmfs": SwarmfsJs,
"txpool": TxpoolJs,
"les": LESJs,
"lespay": LESPayJs,
"vflux": VfluxJs,
}
const ChequebookJs = `
......@@ -877,24 +877,24 @@ web3._extend({
});
`
const LESPayJs = `
const VfluxJs = `
web3._extend({
property: 'lespay',
property: 'vflux',
methods:
[
new web3._extend.Method({
name: 'distribution',
call: 'lespay_distribution',
call: 'vflux_distribution',
params: 2
}),
new web3._extend.Method({
name: 'timeout',
call: 'lespay_timeout',
call: 'vflux_timeout',
params: 2
}),
new web3._extend.Method({
name: 'value',
call: 'lespay_value',
call: 'vflux_value',
params: 2
}),
],
......@@ -902,7 +902,7 @@ web3._extend({
[
new web3._extend.Property({
name: 'requestStats',
getter: 'lespay_requestStats'
getter: 'vflux_requestStats'
}),
]
});
......
......@@ -57,8 +57,7 @@ type LightEthereum struct {
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *vfc.ValueTracker
serverPool *vfc.ServerPool
dialCandidates enode.Iterator
pruner *pruner
......@@ -109,17 +108,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
valueTracker: vfc.NewValueTracker(lesDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
}
peers.subscribe((*vtSubscription)(leth.valueTracker))
leth.serverPool = newServerPool(lesDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator
leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
leth.relay = newLesTxRelay(peers, leth.retriever)
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.peers, leth.retriever)
......@@ -193,23 +189,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
return leth, nil
}
// vtSubscription implements serverPeerSubscriber
type vtSubscription vfc.ValueTracker
// registerPeer implements serverPeerSubscriber
func (v *vtSubscription) registerPeer(p *serverPeer) {
vt := (*vfc.ValueTracker)(v)
p.setValueTracker(vt, vt.Register(p.ID()))
p.updateVtParams()
}
// unregisterPeer implements serverPeerSubscriber
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
vt := (*vfc.ValueTracker)(v)
vt.Unregister(p.ID())
p.setValueTracker(nil, nil)
}
type LightDummyAPI struct{}
// Etherbase is the address that mining rewards will be send to
......@@ -266,7 +245,7 @@ func (s *LightEthereum) APIs() []rpc.API {
}, {
Namespace: "vflux",
Version: "1.0",
Service: vfc.NewPrivateClientAPI(s.valueTracker),
Service: s.serverPool.API(),
Public: false,
},
}...)
......@@ -302,8 +281,8 @@ func (s *LightEthereum) Start() error {
if err != nil {
return err
}
s.serverPool.addSource(discovery)
s.serverPool.start()
s.serverPool.AddSource(discovery)
s.serverPool.Start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
s.startBloomHandlers(params.BloomBitsBlocksClient)
......@@ -316,8 +295,7 @@ func (s *LightEthereum) Start() error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.serverPool.stop()
s.valueTracker.Stop()
s.serverPool.Stop()
s.peers.close()
s.reqDist.close()
s.odr.Stop()
......
......@@ -114,11 +114,25 @@ func (h *clientHandler) handle(p *serverPeer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
// Register peer with the server pool
if h.backend.serverPool != nil {
if nvt, err := h.backend.serverPool.RegisterNode(p.Node()); err == nil {
p.setValueTracker(nvt)
p.updateVtParams()
defer func() {
p.setValueTracker(nil)
h.backend.serverPool.UnregisterNode(p.Node())
}()
} else {
return err
}
}
// Register the peer locally
if err := h.backend.peers.register(p); err != nil {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}
serverConnectionGauge.Update(int64(h.backend.peers.len()))
connectedAt := mclock.Now()
......
......@@ -349,7 +349,6 @@ type serverPeer struct {
fcServer *flowcontrol.ServerNode // Client side mirror token bucket.
vtLock sync.Mutex
valueTracker *vfc.ValueTracker
nodeValueTracker *vfc.NodeValueTracker
sentReqs map[uint64]sentReqEntry
......@@ -676,9 +675,8 @@ func (p *serverPeer) Handshake(genesis common.Hash, forkid forkid.ID, forkFilter
// setValueTracker sets the value tracker references for connected servers. Note that the
// references should be removed upon disconnection by setValueTracker(nil, nil).
func (p *serverPeer) setValueTracker(vt *vfc.ValueTracker, nvt *vfc.NodeValueTracker) {
func (p *serverPeer) setValueTracker(nvt *vfc.NodeValueTracker) {
p.vtLock.Lock()
p.valueTracker = vt
p.nodeValueTracker = nvt
if nvt != nil {
p.sentReqs = make(map[uint64]sentReqEntry)
......@@ -705,7 +703,7 @@ func (p *serverPeer) updateVtParams() {
}
}
}
p.valueTracker.UpdateCosts(p.nodeValueTracker, reqCosts)
p.nodeValueTracker.UpdateCosts(reqCosts)
}
// sentReqEntry remembers sent requests and their sending times
......@@ -732,7 +730,6 @@ func (p *serverPeer) answeredRequest(id uint64) {
}
e, ok := p.sentReqs[id]
delete(p.sentReqs, id)
vt := p.valueTracker
nvt := p.nodeValueTracker
p.vtLock.Unlock()
if !ok {
......@@ -752,7 +749,7 @@ func (p *serverPeer) answeredRequest(id uint64) {
vtReqs[1] = vfc.ServedRequest{ReqType: uint32(m.rest), Amount: e.amount - 1}
}
dt := time.Duration(mclock.Now() - e.at)
vt.Served(nvt, vtReqs[:reqCount], dt)
nvt.Served(vtReqs[:reqCount], dt)
}
// clientPeer represents each node to which the les server is connected.
......
......@@ -26,17 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nodestate"
)
func testNodeID(i int) enode.ID {
return enode.ID{42, byte(i % 256), byte(i / 256)}
}
func testNodeIndex(id enode.ID) int {
if id[0] != 42 {
return -1
}
return int(id[1]) + int(id[2])*256
}
func testNode(i int) *enode.Node {
return enode.SignNull(new(enr.Record), testNodeID(i))
}
......
......@@ -14,10 +14,11 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
package client
import (
"math/rand"
"strconv"
"sync/atomic"
"testing"
"time"
......@@ -25,8 +26,6 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
vfc "github.com/ethereum/go-ethereum/les/vflux/client"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
)
......@@ -50,13 +49,13 @@ func testNodeIndex(id enode.ID) int {
return int(id[1]) + int(id[2])*256
}
type serverPoolTest struct {
type ServerPoolTest struct {
db ethdb.KeyValueStore
clock *mclock.Simulated
quit chan struct{}
preNeg, preNegFail bool
vt *vfc.ValueTracker
sp *serverPool
vt *ValueTracker
sp *ServerPool
input enode.Iterator
testNodes []spTestNode
trusted []string
......@@ -71,15 +70,15 @@ type spTestNode struct {
connectCycles, waitCycles int
nextConnCycle, totalConn int
connected, service bool
peer *serverPeer
node *enode.Node
}
func newServerPoolTest(preNeg, preNegFail bool) *serverPoolTest {
func newServerPoolTest(preNeg, preNegFail bool) *ServerPoolTest {
nodes := make([]*enode.Node, spTestNodes)
for i := range nodes {
nodes[i] = enode.SignNull(&enr.Record{}, testNodeID(i))
}
return &serverPoolTest{
return &ServerPoolTest{
clock: &mclock.Simulated{},
db: memorydb.New(),
input: enode.CycleNodes(nodes),
......@@ -89,7 +88,7 @@ func newServerPoolTest(preNeg, preNegFail bool) *serverPoolTest {
}
}
func (s *serverPoolTest) beginWait() {
func (s *ServerPoolTest) beginWait() {
// ensure that dialIterator and the maximal number of pre-neg queries are not all stuck in a waiting state
for atomic.AddInt32(&s.waitCount, 1) > preNegLimit {
atomic.AddInt32(&s.waitCount, -1)
......@@ -97,16 +96,16 @@ func (s *serverPoolTest) beginWait() {
}
}
func (s *serverPoolTest) endWait() {
func (s *ServerPoolTest) endWait() {
atomic.AddInt32(&s.waitCount, -1)
atomic.AddInt32(&s.waitEnded, 1)
}
func (s *serverPoolTest) addTrusted(i int) {
func (s *ServerPoolTest) addTrusted(i int) {
s.trusted = append(s.trusted, enode.SignNull(&enr.Record{}, testNodeID(i)).String())
}
func (s *serverPoolTest) start() {
func (s *ServerPoolTest) start() {
var testQuery queryFunc
if s.preNeg {
testQuery = func(node *enode.Node) int {
......@@ -144,13 +143,17 @@ func (s *serverPoolTest) start() {
}
}
s.vt = vfc.NewValueTracker(s.db, s.clock, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000))
s.sp = newServerPool(s.db, []byte("serverpool:"), s.vt, 0, testQuery, s.clock, s.trusted)
s.sp.addSource(s.input)
requestList := make([]RequestInfo, testReqTypes)
for i := range requestList {
requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
}
s.sp, _ = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
s.sp.AddSource(s.input)
s.sp.validSchemes = enode.ValidSchemesForTesting
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
s.disconnect = make(map[int][]int)
s.sp.start()
s.sp.Start()
s.quit = make(chan struct{})
go func() {
last := int32(-1)
......@@ -170,31 +173,30 @@ func (s *serverPoolTest) start() {
}()
}
func (s *serverPoolTest) stop() {
func (s *ServerPoolTest) stop() {
close(s.quit)
s.sp.stop()
s.vt.Stop()
s.sp.Stop()
for i := range s.testNodes {
n := &s.testNodes[i]
if n.connected {
n.totalConn += s.cycle
}
n.connected = false
n.peer = nil
n.node = nil
n.nextConnCycle = 0
}
s.conn, s.servedConn = 0, 0
}
func (s *serverPoolTest) run() {
func (s *ServerPoolTest) run() {
for count := spTestLength; count > 0; count-- {
if dcList := s.disconnect[s.cycle]; dcList != nil {
for _, idx := range dcList {
n := &s.testNodes[idx]
s.sp.unregisterPeer(n.peer)
s.sp.UnregisterNode(n.node)
n.totalConn += s.cycle
n.connected = false
n.peer = nil
n.node = nil
s.conn--
if n.service {
s.servedConn--
......@@ -221,10 +223,10 @@ func (s *serverPoolTest) run() {
n.connected = true
dc := s.cycle + n.connectCycles
s.disconnect[dc] = append(s.disconnect[dc], idx)
n.peer = &serverPeer{peerCommons: peerCommons{Peer: p2p.NewPeer(id, "", nil)}}
s.sp.registerPeer(n.peer)
n.node = dial
nv, _ := s.sp.RegisterNode(n.node)
if n.service {
s.vt.Served(s.vt.GetNode(id), []vfc.ServedRequest{{ReqType: 0, Amount: 100}}, 0)
nv.Served([]ServedRequest{{ReqType: 0, Amount: 100}}, 0)
}
}
}
......@@ -234,7 +236,7 @@ func (s *serverPoolTest) run() {
}
}
func (s *serverPoolTest) setNodes(count, conn, wait int, service, trusted bool) (res []int) {
func (s *ServerPoolTest) setNodes(count, conn, wait int, service, trusted bool) (res []int) {
for ; count > 0; count-- {
idx := rand.Intn(spTestNodes)
for s.testNodes[idx].connectCycles != 0 || s.testNodes[idx].connected {
......@@ -253,11 +255,11 @@ func (s *serverPoolTest) setNodes(count, conn, wait int, service, trusted bool)
return
}
func (s *serverPoolTest) resetNodes() {
func (s *ServerPoolTest) resetNodes() {
for i, n := range s.testNodes {
if n.connected {
n.totalConn += s.cycle
s.sp.unregisterPeer(n.peer)
s.sp.UnregisterNode(n.node)
}
s.testNodes[i] = spTestNode{totalConn: n.totalConn}
}
......@@ -266,7 +268,7 @@ func (s *serverPoolTest) resetNodes() {
s.trusted = nil
}
func (s *serverPoolTest) checkNodes(t *testing.T, nodes []int) {
func (s *ServerPoolTest) checkNodes(t *testing.T, nodes []int) {
var sum int
for _, idx := range nodes {
n := &s.testNodes[idx]
......
......@@ -45,6 +45,7 @@ var (
type NodeValueTracker struct {
lock sync.Mutex
vt *ValueTracker
rtStats, lastRtStats ResponseTimeStats
lastTransfer mclock.AbsTime
basket serverBasket
......@@ -52,15 +53,12 @@ type NodeValueTracker struct {
reqValues *[]float64
}
// init initializes a NodeValueTracker.
// Note that the contents of the referenced reqValues slice will not change; a new
// reference is passed if the values are updated by ValueTracker.
func (nv *NodeValueTracker) init(now mclock.AbsTime, reqValues *[]float64) {
reqTypeCount := len(*reqValues)
nv.reqCosts = make([]uint64, reqTypeCount)
nv.lastTransfer = now
nv.reqValues = reqValues
nv.basket.init(reqTypeCount)
// UpdateCosts updates the node value tracker's request cost table
func (nv *NodeValueTracker) UpdateCosts(reqCosts []uint64) {
nv.vt.lock.Lock()
defer nv.vt.lock.Unlock()
nv.updateCosts(reqCosts, &nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts))
}
// updateCosts updates the request cost table of the server. The request value factor
......@@ -97,6 +95,28 @@ func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float
return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats
}
type ServedRequest struct {
ReqType, Amount uint32
}
// Served adds a served request to the node's statistics. An actual request may be composed
// of one or more request types (service vector indices).
func (nv *NodeValueTracker) Served(reqs []ServedRequest, respTime time.Duration) {
nv.vt.statsExpLock.RLock()
expFactor := nv.vt.statsExpFactor
nv.vt.statsExpLock.RUnlock()
nv.lock.Lock()
defer nv.lock.Unlock()
var value float64
for _, r := range reqs {
nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
}
nv.rtStats.Add(respTime, value, expFactor)
}
// RtStats returns the node's own response time distribution statistics
func (nv *NodeValueTracker) RtStats() ResponseTimeStats {
nv.lock.Lock()
......@@ -333,7 +353,12 @@ func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
return nil
}
nv := vt.loadOrNewNode(id)
nv.init(vt.clock.Now(), &vt.refBasket.reqValues)
reqTypeCount := len(vt.refBasket.reqValues)
nv.reqCosts = make([]uint64, reqTypeCount)
nv.lastTransfer = vt.clock.Now()
nv.reqValues = &vt.refBasket.reqValues
nv.basket.init(reqTypeCount)
vt.connected[id] = nv
return nv
}
......@@ -364,7 +389,7 @@ func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker {
if nv, ok := vt.connected[id]; ok {
return nv
}
nv := &NodeValueTracker{lastTransfer: vt.clock.Now()}
nv := &NodeValueTracker{vt: vt, lastTransfer: vt.clock.Now()}
enc, err := vt.db.Get(append(vtNodeKey, id[:]...))
if err != nil {
return nv
......@@ -425,14 +450,6 @@ func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) {
}
}
// UpdateCosts updates the node value tracker's request cost table
func (vt *ValueTracker) UpdateCosts(nv *NodeValueTracker, reqCosts []uint64) {
vt.lock.Lock()
defer vt.lock.Unlock()
nv.updateCosts(reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(reqCosts))
}
// RtStats returns the global response time distribution statistics
func (vt *ValueTracker) RtStats() ResponseTimeStats {
vt.lock.Lock()
......@@ -464,28 +481,6 @@ func (vt *ValueTracker) periodicUpdate() {
vt.saveToDb()
}
type ServedRequest struct {
ReqType, Amount uint32
}
// Served adds a served request to the node's statistics. An actual request may be composed
// of one or more request types (service vector indices).
func (vt *ValueTracker) Served(nv *NodeValueTracker, reqs []ServedRequest, respTime time.Duration) {
vt.statsExpLock.RLock()
expFactor := vt.statsExpFactor
vt.statsExpLock.RUnlock()
nv.lock.Lock()
defer nv.lock.Unlock()
var value float64
for _, r := range reqs {
nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
}
nv.rtStats.Add(respTime, value, vt.statsExpFactor)
}
type RequestStatsItem struct {
Name string
ReqAmount, ReqValue float64
......
......@@ -64,7 +64,7 @@ func TestValueTracker(t *testing.T) {
for j := range costList {
costList[j] = uint64(baseCost * relPrices[j])
}
vt.UpdateCosts(nodes[i], costList)
nodes[i].UpdateCosts(costList)
}
for i := range nodes {
nodes[i] = vt.Register(enode.ID{byte(i)})
......@@ -77,7 +77,7 @@ func TestValueTracker(t *testing.T) {
node := rand.Intn(testNodeCount)
respTime := time.Duration((rand.Float64() + 1) * float64(time.Second) * float64(node+1) / testNodeCount)
totalAmount[reqType] += uint64(reqAmount)
vt.Served(nodes[node], []ServedRequest{{uint32(reqType), uint32(reqAmount)}}, respTime)
nodes[node].Served([]ServedRequest{{uint32(reqType), uint32(reqAmount)}}, respTime)
clock.Run(time.Second)
}
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment