Commit 0ce5e113 authored by gary rong's avatar gary rong Committed by Felföldi Zsolt

les: rework clientpool (#20077)

* les: rework clientpool
parent 44b74cfc
...@@ -67,7 +67,7 @@ type balanceCallback struct { ...@@ -67,7 +67,7 @@ type balanceCallback struct {
// init initializes balanceTracker // init initializes balanceTracker
func (bt *balanceTracker) init(clock mclock.Clock, capacity uint64) { func (bt *balanceTracker) init(clock mclock.Clock, capacity uint64) {
bt.clock = clock bt.clock = clock
bt.initTime = clock.Now() bt.initTime, bt.lastUpdate = clock.Now(), clock.Now() // Init timestamps
for i := range bt.callbackIndex { for i := range bt.callbackIndex {
bt.callbackIndex[i] = -1 bt.callbackIndex[i] = -1
} }
......
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
func TestSetBalance(t *testing.T) {
var clock = &mclock.Simulated{}
var inputs = []struct {
pos uint64
neg uint64
}{
{1000, 0},
{0, 1000},
{1000, 1000},
}
tracker := balanceTracker{}
tracker.init(clock, 1000)
defer tracker.stop(clock.Now())
for _, i := range inputs {
tracker.setBalance(i.pos, i.neg)
pos, neg := tracker.getBalance(clock.Now())
if pos != i.pos {
t.Fatalf("Positive balance mismatch, want %v, got %v", i.pos, pos)
}
if neg != i.neg {
t.Fatalf("Negative balance mismatch, want %v, got %v", i.neg, neg)
}
}
}
func TestBalanceTimeCost(t *testing.T) {
var (
clock = &mclock.Simulated{}
tracker = balanceTracker{}
)
tracker.init(clock, 1000)
defer tracker.stop(clock.Now())
tracker.setFactors(false, 1, 1)
tracker.setFactors(true, 1, 1)
tracker.setBalance(uint64(time.Minute), 0) // 1 minute time allowance
var inputs = []struct {
runTime time.Duration
expPos uint64
expNeg uint64
}{
{time.Second, uint64(time.Second * 59), 0},
{0, uint64(time.Second * 59), 0},
{time.Second * 59, 0, 0},
{time.Second, 0, uint64(time.Second)},
}
for _, i := range inputs {
clock.Run(i.runTime)
if pos, _ := tracker.getBalance(clock.Now()); pos != i.expPos {
t.Fatalf("Positive balance mismatch, want %v, got %v", i.expPos, pos)
}
if _, neg := tracker.getBalance(clock.Now()); neg != i.expNeg {
t.Fatalf("Negative balance mismatch, want %v, got %v", i.expNeg, neg)
}
}
tracker.setBalance(uint64(time.Minute), 0) // Refill 1 minute time allowance
for _, i := range inputs {
clock.Run(i.runTime)
if pos, _ := tracker.getBalance(clock.Now()); pos != i.expPos {
t.Fatalf("Positive balance mismatch, want %v, got %v", i.expPos, pos)
}
if _, neg := tracker.getBalance(clock.Now()); neg != i.expNeg {
t.Fatalf("Negative balance mismatch, want %v, got %v", i.expNeg, neg)
}
}
}
func TestBalanceReqCost(t *testing.T) {
var (
clock = &mclock.Simulated{}
tracker = balanceTracker{}
)
tracker.init(clock, 1000)
defer tracker.stop(clock.Now())
tracker.setFactors(false, 1, 1)
tracker.setFactors(true, 1, 1)
tracker.setBalance(uint64(time.Minute), 0) // 1 minute time serving time allowance
var inputs = []struct {
reqCost uint64
expPos uint64
expNeg uint64
}{
{uint64(time.Second), uint64(time.Second * 59), 0},
{0, uint64(time.Second * 59), 0},
{uint64(time.Second * 59), 0, 0},
{uint64(time.Second), 0, uint64(time.Second)},
}
for _, i := range inputs {
tracker.requestCost(i.reqCost)
if pos, _ := tracker.getBalance(clock.Now()); pos != i.expPos {
t.Fatalf("Positive balance mismatch, want %v, got %v", i.expPos, pos)
}
if _, neg := tracker.getBalance(clock.Now()); neg != i.expNeg {
t.Fatalf("Negative balance mismatch, want %v, got %v", i.expNeg, neg)
}
}
}
func TestBalanceToPriority(t *testing.T) {
var (
clock = &mclock.Simulated{}
tracker = balanceTracker{}
)
tracker.init(clock, 1000) // cap = 1000
defer tracker.stop(clock.Now())
tracker.setFactors(false, 1, 1)
tracker.setFactors(true, 1, 1)
var inputs = []struct {
pos uint64
neg uint64
priority int64
}{
{1000, 0, ^int64(1)},
{2000, 0, ^int64(2)}, // Higher balance, lower priority value
{0, 0, 0},
{0, 1000, 1000},
}
for _, i := range inputs {
tracker.setBalance(i.pos, i.neg)
priority := tracker.getPriority(clock.Now())
if priority != i.priority {
t.Fatalf("Priority mismatch, want %v, got %v", i.priority, priority)
}
}
}
func TestEstimatedPriority(t *testing.T) {
var (
clock = &mclock.Simulated{}
tracker = balanceTracker{}
)
tracker.init(clock, 1000000000) // cap = 1000,000,000
defer tracker.stop(clock.Now())
tracker.setFactors(false, 1, 1)
tracker.setFactors(true, 1, 1)
tracker.setBalance(uint64(time.Minute), 0)
var inputs = []struct {
runTime time.Duration // time cost
futureTime time.Duration // diff of future time
reqCost uint64 // single request cost
priority int64 // expected estimated priority
}{
{time.Second, time.Second, 0, ^int64(58)},
{0, time.Second, 0, ^int64(58)},
// 2 seconds time cost, 1 second estimated time cost, 10^9 request cost,
// 10^9 estimated request cost per second.
{time.Second, time.Second, 1000000000, ^int64(55)},
// 3 seconds time cost, 3 second estimated time cost, 10^9*2 request cost,
// 4*10^9 estimated request cost.
{time.Second, 3 * time.Second, 1000000000, ^int64(48)},
// All positive balance is used up
{time.Second * 55, 0, 0, 0},
// 1 minute estimated time cost, 4/58 * 10^9 estimated request cost per sec.
{0, time.Minute, 0, int64(time.Minute) + int64(time.Second)*120/29},
}
for _, i := range inputs {
clock.Run(i.runTime)
tracker.requestCost(i.reqCost)
priority := tracker.estimatedPriority(clock.Now()+mclock.AbsTime(i.futureTime), true)
if priority != i.priority {
t.Fatalf("Estimated priority mismatch, want %v, got %v", i.priority, priority)
}
}
}
func TestCallbackChecking(t *testing.T) {
var (
clock = &mclock.Simulated{}
tracker = balanceTracker{}
)
tracker.init(clock, 1000000) // cap = 1000,000
defer tracker.stop(clock.Now())
tracker.setFactors(false, 1, 1)
tracker.setFactors(true, 1, 1)
var inputs = []struct {
priority int64
expDiff time.Duration
}{
{^int64(500), time.Millisecond * 500},
{0, time.Second},
{int64(time.Second), 2 * time.Second},
}
tracker.setBalance(uint64(time.Second), 0)
for _, i := range inputs {
diff, _ := tracker.timeUntil(i.priority)
if diff != i.expDiff {
t.Fatalf("Time difference mismatch, want %v, got %v", i.expDiff, diff)
}
}
}
func TestCallback(t *testing.T) {
var (
clock = &mclock.Simulated{}
tracker = balanceTracker{}
)
tracker.init(clock, 1000) // cap = 1000
defer tracker.stop(clock.Now())
tracker.setFactors(false, 1, 1)
tracker.setFactors(true, 1, 1)
callCh := make(chan struct{}, 1)
tracker.setBalance(uint64(time.Minute), 0)
tracker.addCallback(balanceCallbackZero, 0, func() { callCh <- struct{}{} })
clock.Run(time.Minute)
select {
case <-callCh:
case <-time.NewTimer(time.Second).C:
t.Fatalf("Callback hasn't been called yet")
}
tracker.setBalance(uint64(time.Minute), 0)
tracker.addCallback(balanceCallbackZero, 0, func() { callCh <- struct{}{} })
tracker.removeCallback(balanceCallbackZero)
clock.Run(time.Minute)
select {
case <-callCh:
t.Fatalf("Callback shouldn't be called")
case <-time.NewTimer(time.Millisecond * 100).C:
}
}
This diff is collapsed.
This diff is collapsed.
...@@ -188,6 +188,12 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od ...@@ -188,6 +188,12 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od
client.handler.synchronise(client.peer.peer) client.handler.synchronise(client.peer.peer)
// Ensure the client has synced all necessary data.
clientHead := client.handler.backend.blockchain.CurrentHeader()
if clientHead.Number.Uint64() != 4 {
t.Fatalf("Failed to sync the chain with server, head: %v", clientHead.Number.Uint64())
}
test := func(expFail uint64) { test := func(expFail uint64) {
// Mark this as a helper to put the failures at the correct lines // Mark this as a helper to put the failures at the correct lines
t.Helper() t.Helper()
......
...@@ -81,8 +81,15 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { ...@@ -81,8 +81,15 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
// Assemble the test environment // Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
defer tearDown() defer tearDown()
client.handler.synchronise(client.peer.peer) client.handler.synchronise(client.peer.peer)
// Ensure the client has synced all necessary data.
clientHead := client.handler.backend.blockchain.CurrentHeader()
if clientHead.Number.Uint64() != 4 {
t.Fatalf("Failed to sync the chain with server, head: %v", clientHead.Number.Uint64())
}
test := func(expFail uint64) { test := func(expFail uint64) {
for i := uint64(0); i <= server.handler.blockchain.CurrentHeader().Number.Uint64(); i++ { for i := uint64(0); i <= server.handler.blockchain.CurrentHeader().Number.Uint64(); i++ {
bhash := rawdb.ReadCanonicalHash(server.db, i) bhash := rawdb.ReadCanonicalHash(server.db, i)
......
...@@ -113,7 +113,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { ...@@ -113,7 +113,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
maxCapacity = totalRecharge maxCapacity = totalRecharge
} }
srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2) srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) }) srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1}) srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
checkpoint := srv.latestLocalCheckpoint() checkpoint := srv.latestLocalCheckpoint()
...@@ -183,9 +183,9 @@ func (s *LesServer) Stop() { ...@@ -183,9 +183,9 @@ func (s *LesServer) Stop() {
s.peers.Close() s.peers.Close()
s.fcManager.Stop() s.fcManager.Stop()
s.clientPool.stop()
s.costTracker.stop() s.costTracker.stop()
s.handler.stop() s.handler.stop()
s.clientPool.stop() // client pool should be closed after handler.
s.servingQueue.stop() s.servingQueue.stop()
// Note, bloom trie indexer is closed by parent bloombits indexer. // Note, bloom trie indexer is closed by parent bloombits indexer.
......
...@@ -30,17 +30,14 @@ import ( ...@@ -30,17 +30,14 @@ import (
) )
// Test light syncing which will download all headers from genesis. // Test light syncing which will download all headers from genesis.
func TestLightSyncingLes2(t *testing.T) { testCheckpointSyncing(t, 2, 0) }
func TestLightSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 0) } func TestLightSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 0) }
// Test legacy checkpoint syncing which will download tail headers // Test legacy checkpoint syncing which will download tail headers
// based on a hardcoded checkpoint. // based on a hardcoded checkpoint.
func TestLegacyCheckpointSyncingLes2(t *testing.T) { testCheckpointSyncing(t, 2, 1) }
func TestLegacyCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 1) } func TestLegacyCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 1) }
// Test checkpoint syncing which will download tail headers based // Test checkpoint syncing which will download tail headers based
// on a verified checkpoint. // on a verified checkpoint.
func TestCheckpointSyncingLes2(t *testing.T) { testCheckpointSyncing(t, 2, 2) }
func TestCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 2) } func TestCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 2) }
func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
......
...@@ -280,7 +280,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da ...@@ -280,7 +280,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
} }
server.costTracker, server.freeCapacity = newCostTracker(db, server.config) server.costTracker, server.freeCapacity = newCostTracker(db, server.config)
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism. server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.clientPool = newClientPool(db, 1, 10000, clock, nil) server.clientPool = newClientPool(db, 1, clock, nil)
server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true }) server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil { if server.oracle != nil {
...@@ -517,7 +517,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer ...@@ -517,7 +517,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
if connect { if connect {
cpeer, err1, speer, err2 = newTestPeerPair("peer", protocol, server, client) cpeer, err1, speer, err2 = newTestPeerPair("peer", protocol, server, client)
select { select {
case <-time.After(time.Millisecond * 100): case <-time.After(time.Millisecond * 300):
case err := <-err1: case err := <-err1:
t.Fatalf("peer 1 handshake error: %v", err) t.Fatalf("peer 1 handshake error: %v", err)
case err := <-err2: case err := <-err2:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment