Unverified Commit a7de7968 authored by Felföldi Zsolt's avatar Felföldi Zsolt Committed by GitHub

les: implement new client pool (#19745)

parent 947f5f2b
......@@ -42,6 +42,12 @@ type Clock interface {
Now() AbsTime
Sleep(time.Duration)
After(time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) Event
}
// Event represents a cancellable event returned by AfterFunc
type Event interface {
Cancel() bool
}
// System implements Clock using the system clock.
......@@ -61,3 +67,16 @@ func (System) Sleep(d time.Duration) {
func (System) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
// AfterFunc implements Clock.
func (System) AfterFunc(d time.Duration, f func()) Event {
return (*SystemEvent)(time.AfterFunc(d, f))
}
// SystemEvent implements Event using time.Timer.
type SystemEvent time.Timer
// Cancel implements Event.
func (e *SystemEvent) Cancel() bool {
return (*time.Timer)(e).Stop()
}
......@@ -35,30 +35,44 @@ type Simulated struct {
scheduled []event
mu sync.RWMutex
cond *sync.Cond
lastId uint64
}
type event struct {
do func()
at AbsTime
id uint64
}
// SimulatedEvent implements Event for a virtual clock.
type SimulatedEvent struct {
at AbsTime
id uint64
s *Simulated
}
// Run moves the clock by the given duration, executing all timers before that duration.
func (s *Simulated) Run(d time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
s.init()
end := s.now + AbsTime(d)
var do []func()
for len(s.scheduled) > 0 {
ev := s.scheduled[0]
if ev.at > end {
break
}
s.now = ev.at
ev.do()
do = append(do, ev.do)
s.scheduled = s.scheduled[1:]
}
s.now = end
s.mu.Unlock()
for _, fn := range do {
fn()
}
}
func (s *Simulated) ActiveTimers() int {
......@@ -94,23 +108,26 @@ func (s *Simulated) Sleep(d time.Duration) {
// After implements Clock.
func (s *Simulated) After(d time.Duration) <-chan time.Time {
after := make(chan time.Time, 1)
s.insert(d, func() {
s.AfterFunc(d, func() {
after <- (time.Time{}).Add(time.Duration(s.now))
})
return after
}
func (s *Simulated) insert(d time.Duration, do func()) {
// AfterFunc implements Clock.
func (s *Simulated) AfterFunc(d time.Duration, do func()) Event {
s.mu.Lock()
defer s.mu.Unlock()
s.init()
at := s.now + AbsTime(d)
s.lastId++
id := s.lastId
l, h := 0, len(s.scheduled)
ll := h
for l != h {
m := (l + h) / 2
if at < s.scheduled[m].at {
if (at < s.scheduled[m].at) || ((at == s.scheduled[m].at) && (id < s.scheduled[m].id)) {
h = m
} else {
l = m + 1
......@@ -118,8 +135,10 @@ func (s *Simulated) insert(d time.Duration, do func()) {
}
s.scheduled = append(s.scheduled, event{})
copy(s.scheduled[l+1:], s.scheduled[l:ll])
s.scheduled[l] = event{do: do, at: at}
e := event{do: do, at: at, id: id}
s.scheduled[l] = e
s.cond.Broadcast()
return &SimulatedEvent{at: at, id: id, s: s}
}
func (s *Simulated) init() {
......@@ -127,3 +146,31 @@ func (s *Simulated) init() {
s.cond = sync.NewCond(&s.mu)
}
}
// Cancel implements Event.
func (e *SimulatedEvent) Cancel() bool {
s := e.s
s.mu.Lock()
defer s.mu.Unlock()
l, h := 0, len(s.scheduled)
ll := h
for l != h {
m := (l + h) / 2
if e.id == s.scheduled[m].id {
l = m
break
}
if (e.at < s.scheduled[m].at) || ((e.at == s.scheduled[m].at) && (e.id < s.scheduled[m].id)) {
h = m
} else {
l = m + 1
}
}
if l >= ll || s.scheduled[l].id != e.id {
return false
}
copy(s.scheduled[l:ll-1], s.scheduled[l+1:])
s.scheduled = s.scheduled[:ll-1]
return true
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package prque
import (
"container/heap"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
// LazyQueue is a priority queue data structure where priorities can change over
// time and are only evaluated on demand.
// Two callbacks are required:
// - priority evaluates the actual priority of an item
// - maxPriority gives an upper estimate for the priority in any moment between
// now and the given absolute time
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
clock mclock.Clock
// Items are stored in one of two internal queues ordered by estimated max
// priority until the next and the next-after-next refresh. Update and Refresh
// always places items in queue[1].
queue [2]*sstack
popQueue *sstack
period time.Duration
maxUntil mclock.AbsTime
indexOffset int
setIndex SetIndexCallback
priority PriorityCallback
maxPriority MaxPriorityCallback
}
type (
PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
)
// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
clock: clock,
period: refreshPeriod}
q.Reset()
q.Refresh()
return q
}
// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0)
q.queue[1] = newSstack(q.setIndex1)
}
// Refresh should be called at least with the frequency specified by the refreshPeriod parameter
func (q *LazyQueue) Refresh() {
q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period)
for q.queue[0].Len() != 0 {
q.Push(heap.Pop(q.queue[0]).(*item).value)
}
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
q.indexOffset = 1 - q.indexOffset
q.maxUntil += mclock.AbsTime(q.period)
}
// Push adds an item to the queue
func (q *LazyQueue) Push(data interface{}) {
heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
}
// Update updates the upper priority estimate for the item with the given queue index
func (q *LazyQueue) Update(index int) {
q.Push(q.Remove(index))
}
// Pop removes and returns the item with the greatest actual priority
func (q *LazyQueue) Pop() (interface{}, int64) {
var (
resData interface{}
resPri int64
)
q.MultiPop(func(data interface{}, priority int64) bool {
resData = data
resPri = priority
return false
})
return resData, resPri
}
// peekIndex returns the index of the internal queue where the item with the
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
if q.queue[0].Len() != 0 {
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
return 1
}
return 0
}
if q.queue[1].Len() != 0 {
return 1
}
return -1
}
// MultiPop pops multiple items from the queue and is more efficient than calling
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
now := q.clock.Now()
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data, now)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)
if !callback(i.value, i.priority) {
for q.popQueue.Len() != 0 {
q.Push(heap.Pop(q.popQueue).(*item).value)
}
return
}
}
}
}
// PopItem pops the item from the queue only, dropping the associated priority value.
func (q *LazyQueue) PopItem() interface{} {
i, _ := q.Pop()
return i
}
// Remove removes removes the item with the given index.
func (q *LazyQueue) Remove(index int) interface{} {
if index < 0 {
return nil
}
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
}
// Empty checks whether the priority queue is empty.
func (q *LazyQueue) Empty() bool {
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
}
// Size returns the number of items in the priority queue.
func (q *LazyQueue) Size() int {
return q.queue[0].Len() + q.queue[1].Len()
}
// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex0(data interface{}, index int) {
if index == -1 {
q.setIndex(data, -1)
} else {
q.setIndex(data, index+index)
}
}
// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex1(data interface{}, index int) {
q.setIndex(data, index+index+1)
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package prque
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
)
const (
testItems = 1000
testPriorityStep = 100
testSteps = 1000000
testStepPeriod = time.Millisecond
testQueueRefresh = time.Second
testAvgRate = float64(testPriorityStep) / float64(testItems) / float64(testStepPeriod)
)
type lazyItem struct {
p, maxp int64
last mclock.AbsTime
index int
}
func testPriority(a interface{}, now mclock.AbsTime) int64 {
return a.(*lazyItem).p
}
func testMaxPriority(a interface{}, until mclock.AbsTime) int64 {
i := a.(*lazyItem)
dt := until - i.last
i.maxp = i.p + int64(float64(dt)*testAvgRate)
return i.maxp
}
func testSetIndex(a interface{}, i int) {
a.(*lazyItem).index = i
}
func TestLazyQueue(t *testing.T) {
rand.Seed(time.Now().UnixNano())
clock := &mclock.Simulated{}
q := NewLazyQueue(testSetIndex, testPriority, testMaxPriority, clock, testQueueRefresh)
var (
items [testItems]lazyItem
maxPri int64
)
for i := range items[:] {
items[i].p = rand.Int63n(testPriorityStep * 10)
if items[i].p > maxPri {
maxPri = items[i].p
}
items[i].index = -1
q.Push(&items[i])
}
var lock sync.Mutex
stopCh := make(chan chan struct{})
go func() {
for {
select {
case <-clock.After(testQueueRefresh):
lock.Lock()
q.Refresh()
lock.Unlock()
case stop := <-stopCh:
close(stop)
return
}
}
}()
for c := 0; c < testSteps; c++ {
i := rand.Intn(testItems)
lock.Lock()
items[i].p += rand.Int63n(testPriorityStep*2-1) + 1
if items[i].p > maxPri {
maxPri = items[i].p
}
items[i].last = clock.Now()
if items[i].p > items[i].maxp {
q.Update(items[i].index)
}
if rand.Intn(100) == 0 {
p := q.PopItem().(*lazyItem)
if p.p != maxPri {
t.Fatalf("incorrect item (best known priority %d, popped %d)", maxPri, p.p)
}
q.Push(p)
}
lock.Unlock()
clock.Run(testStepPeriod)
clock.WaitForTimers(1)
}
stop := make(chan struct{})
stopCh <- stop
<-stop
}
......@@ -96,17 +96,14 @@ func testCapacityAPI(t *testing.T, clientCount int) {
t.Fatalf("Failed to obtain rpc client: %v", err)
}
headNum, headHash := getHead(ctx, t, serverRpcClient)
totalCap := getTotalCap(ctx, t, serverRpcClient)
minCap := getMinCap(ctx, t, serverRpcClient)
minCap, freeCap, totalCap := getCapacityInfo(ctx, t, serverRpcClient)
testCap := totalCap * 3 / 4
t.Logf("Server testCap: %d minCap: %d head number: %d head hash: %064x\n", testCap, minCap, headNum, headHash)
reqMinCap := uint64(float64(testCap) * minRelCap / (minRelCap + float64(len(clients)-1)))
if minCap > reqMinCap {
t.Fatalf("Minimum client capacity (%d) bigger than required minimum for this test (%d)", minCap, reqMinCap)
}
freeIdx := rand.Intn(len(clients))
freeCap := getFreeCap(ctx, t, serverRpcClient)
for i, client := range clients {
var err error
......@@ -146,7 +143,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
i, c := i, c
go func() {
queue := make(chan struct{}, 100)
var count uint64
reqCount[i] = 0
for {
select {
case queue <- struct{}{}:
......@@ -164,8 +161,10 @@ func testCapacityAPI(t *testing.T, clientCount int) {
wg.Done()
<-queue
if ok {
count++
atomic.StoreUint64(&reqCount[i], count)
count := atomic.AddUint64(&reqCount[i], 1)
if count%10000 == 0 {
freezeClient(ctx, t, serverRpcClient, clients[i].ID())
}
}
}()
}
......@@ -238,7 +237,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
default:
}
totalCap = getTotalCap(ctx, t, serverRpcClient)
_, _, totalCap = getCapacityInfo(ctx, t, serverRpcClient)
if totalCap < testCap {
t.Log("Total capacity underrun")
close(stop)
......@@ -327,58 +326,61 @@ func testRequest(ctx context.Context, t *testing.T, client *rpc.Client) bool {
return err == nil
}
func setCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID, cap uint64) {
if err := server.CallContext(ctx, nil, "les_setClientCapacity", clientID, cap); err != nil {
t.Fatalf("Failed to set client capacity: %v", err)
func freezeClient(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID) {
if err := server.CallContext(ctx, nil, "debug_freezeClient", clientID); err != nil {
t.Fatalf("Failed to freeze client: %v", err)
}
}
func getCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID) uint64 {
var s string
if err := server.CallContext(ctx, &s, "les_getClientCapacity", clientID); err != nil {
t.Fatalf("Failed to get client capacity: %v", err)
}
cap, err := hexutil.DecodeUint64(s)
if err != nil {
t.Fatalf("Failed to decode client capacity: %v", err)
func setCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID, cap uint64) {
params := make(map[string]interface{})
params["capacity"] = cap
if err := server.CallContext(ctx, nil, "les_setClientParams", []enode.ID{clientID}, []string{}, params); err != nil {
t.Fatalf("Failed to set client capacity: %v", err)
}
return cap
}
func getTotalCap(ctx context.Context, t *testing.T, server *rpc.Client) uint64 {
var s string
if err := server.CallContext(ctx, &s, "les_totalCapacity"); err != nil {
t.Fatalf("Failed to query total capacity: %v", err)
func getCapacity(ctx context.Context, t *testing.T, server *rpc.Client, clientID enode.ID) uint64 {
var res map[enode.ID]map[string]interface{}
if err := server.CallContext(ctx, &res, "les_clientInfo", []enode.ID{clientID}, []string{}); err != nil {
t.Fatalf("Failed to get client info: %v", err)
}
total, err := hexutil.DecodeUint64(s)
if err != nil {
t.Fatalf("Failed to decode total capacity: %v", err)
info, ok := res[clientID]
if !ok {
t.Fatalf("Missing client info")
}
return total
}
func getMinCap(ctx context.Context, t *testing.T, server *rpc.Client) uint64 {
var s string
if err := server.CallContext(ctx, &s, "les_minimumCapacity"); err != nil {
t.Fatalf("Failed to query minimum capacity: %v", err)
v, ok := info["capacity"]
if !ok {
t.Fatalf("Missing field in client info: capacity")
}
min, err := hexutil.DecodeUint64(s)
if err != nil {
t.Fatalf("Failed to decode minimum capacity: %v", err)
vv, ok := v.(float64)
if !ok {
t.Fatalf("Failed to decode capacity field")
}
return min
return uint64(vv)
}
func getFreeCap(ctx context.Context, t *testing.T, server *rpc.Client) uint64 {
var s string
if err := server.CallContext(ctx, &s, "les_freeClientCapacity"); err != nil {
t.Fatalf("Failed to query free client capacity: %v", err)
func getCapacityInfo(ctx context.Context, t *testing.T, server *rpc.Client) (minCap, freeCap, totalCap uint64) {
var res map[string]interface{}
if err := server.CallContext(ctx, &res, "les_serverInfo"); err != nil {
t.Fatalf("Failed to query server info: %v", err)
}
free, err := hexutil.DecodeUint64(s)
if err != nil {
t.Fatalf("Failed to decode free client capacity: %v", err)
decode := func(s string) uint64 {
v, ok := res[s]
if !ok {
t.Fatalf("Missing field in server info: %s", s)
}
vv, ok := v.(float64)
if !ok {
t.Fatalf("Failed to decode server info field: %s", s)
}
return uint64(vv)
}
return free
minCap = decode("minimumCapacity")
freeCap = decode("freeClientCapacity")
totalCap = decode("totalCapacity")
return
}
func init() {
......
This diff is collapsed.
This diff is collapsed.
// Copyright 2018 The go-ethereum Authors
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
......@@ -19,77 +19,104 @@ package les
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/p2p/enode"
)
func TestFreeClientPoolL10C100(t *testing.T) {
testFreeClientPool(t, 10, 100)
func TestClientPoolL10C100Free(t *testing.T) {
testClientPool(t, 10, 100, 0, true)
}
func TestFreeClientPoolL40C200(t *testing.T) {
testFreeClientPool(t, 40, 200)
func TestClientPoolL40C200Free(t *testing.T) {
testClientPool(t, 40, 200, 0, true)
}
func TestFreeClientPoolL100C300(t *testing.T) {
testFreeClientPool(t, 100, 300)
func TestClientPoolL100C300Free(t *testing.T) {
testClientPool(t, 100, 300, 0, true)
}
const testFreeClientPoolTicks = 500000
func TestClientPoolL10C100P4(t *testing.T) {
testClientPool(t, 10, 100, 4, false)
}
func TestClientPoolL40C200P30(t *testing.T) {
testClientPool(t, 40, 200, 30, false)
}
func TestClientPoolL100C300P20(t *testing.T) {
testClientPool(t, 100, 300, 20, false)
}
const testClientPoolTicks = 500000
type poolTestPeer int
func (i poolTestPeer) ID() enode.ID {
return enode.ID{byte(i % 256), byte(i >> 8)}
}
func (i poolTestPeer) freeClientId() string {
return fmt.Sprintf("addr #%d", i)
}
func (i poolTestPeer) updateCapacity(uint64) {}
func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomDisconnect bool) {
rand.Seed(time.Now().UnixNano())
var (
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
connected = make([]bool, clientCount)
connTicks = make([]int, clientCount)
disconnCh = make(chan int, clientCount)
peerAddress = func(i int) string {
return fmt.Sprintf("addr #%d", i)
}
peerId = func(i int) string {
return fmt.Sprintf("id #%d", i)
}
disconnFn = func(id string) {
i, err := strconv.Atoi(id[4:])
if err != nil {
panic(err)
}
disconnCh <- i
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
connected = make([]bool, clientCount)
connTicks = make([]int, clientCount)
disconnCh = make(chan int, clientCount)
disconnFn = func(id enode.ID) {
disconnCh <- int(id[0]) + int(id[1])<<8
}
pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
pool = newClientPool(db, 1, 10000, &clock, disconnFn)
)
pool.setLimits(connLimit, uint64(connLimit))
pool.setPriceFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1})
// pool should accept new peers up to its connected limit
for i := 0; i < connLimit; i++ {
if pool.connect(peerAddress(i), peerId(i)) {
if pool.connect(poolTestPeer(i), 0) != nil {
connected[i] = true
} else {
t.Fatalf("Test peer #%d rejected", i)
}
}
// since all accepted peers are new and should not be kicked out, the next one should be rejected
if pool.connect(peerAddress(connLimit), peerId(connLimit)) {
if pool.connect(poolTestPeer(connLimit), 0) != nil {
connected[connLimit] = true
t.Fatalf("Peer accepted over connected limit")
}
// randomly connect and disconnect peers, expect to have a similar total connection time at the end
for tickCounter := 0; tickCounter < testFreeClientPoolTicks; tickCounter++ {
for tickCounter := 0; tickCounter < testClientPoolTicks; tickCounter++ {
clock.Run(1 * time.Second)
//time.Sleep(time.Microsecond * 100)
if tickCounter == testClientPoolTicks/4 {
// give a positive balance to some of the peers
amount := uint64(testClientPoolTicks / 2 * 1000000000) // enough for half of the simulation period
for i := 0; i < paidCount; i++ {
pool.addBalance(poolTestPeer(i).ID(), amount, false)
}
}
i := rand.Intn(clientCount)
if connected[i] {
pool.disconnect(peerAddress(i))
connected[i] = false
connTicks[i] += tickCounter
if randomDisconnect {
pool.disconnect(poolTestPeer(i))
connected[i] = false
connTicks[i] += tickCounter
}
} else {
if pool.connect(peerAddress(i), peerId(i)) {
if pool.connect(poolTestPeer(i), 0) != nil {
connected[i] = true
connTicks[i] -= tickCounter
}
......@@ -98,7 +125,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
for {
select {
case i := <-disconnCh:
pool.disconnect(peerAddress(i))
pool.disconnect(poolTestPeer(i))
if connected[i] {
connTicks[i] += tickCounter
connected[i] = false
......@@ -109,36 +136,44 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) {
}
}
expTicks := testFreeClientPoolTicks * connLimit / clientCount
expTicks := testClientPoolTicks/2*connLimit/clientCount + testClientPoolTicks/2*(connLimit-paidCount)/(clientCount-paidCount)
expMin := expTicks - expTicks/10
expMax := expTicks + expTicks/10
paidTicks := testClientPoolTicks/2*connLimit/clientCount + testClientPoolTicks/2
paidMin := paidTicks - paidTicks/10
paidMax := paidTicks + paidTicks/10
// check if the total connected time of peers are all in the expected range
for i, c := range connected {
if c {
connTicks[i] += testFreeClientPoolTicks
connTicks[i] += testClientPoolTicks
}
min, max := expMin, expMax
if i < paidCount {
// expect a higher amount for clients with a positive balance
min, max = paidMin, paidMax
}
if connTicks[i] < expMin || connTicks[i] > expMax {
t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], expMin, expMax)
if connTicks[i] < min || connTicks[i] > max {
t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], min, max)
}
}
// a previously unknown peer should be accepted now
if !pool.connect("newAddr", "newId") {
if pool.connect(poolTestPeer(54321), 0) == nil {
t.Fatalf("Previously unknown peer rejected")
}
// close and restart pool
pool.stop()
pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn)
pool = newClientPool(db, 1, 10000, &clock, func(id enode.ID) {})
pool.setLimits(connLimit, uint64(connLimit))
// try connecting all known peers (connLimit should be filled up)
for i := 0; i < clientCount; i++ {
pool.connect(peerAddress(i), peerId(i))
pool.connect(poolTestPeer(i), 0)
}
// expect pool to remember known nodes and kick out one of them to accept a new one
if !pool.connect("newAddr2", "newId2") {
if pool.connect(poolTestPeer(54322), 0) == nil {
t.Errorf("Previously unknown peer rejected after restarting pool")
}
pool.stop()
......
......@@ -17,7 +17,6 @@
package les
import (
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
......@@ -64,7 +63,7 @@ func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol {
return c.protocolManager.runPeer(version, p, rw)
},
PeerInfo: func(id enode.ID) interface{} {
if p := c.protocolManager.peers.Peer(fmt.Sprintf("%x", id.Bytes())); p != nil {
if p := c.protocolManager.peers.Peer(peerIdToString(id)); p != nil {
return p.Info()
}
return nil
......
This diff is collapsed.
......@@ -27,6 +27,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
......@@ -304,8 +305,14 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}
if !pm.client && p.balanceTracker == nil {
// add dummy balance tracker for tests
p.balanceTracker = &balanceTracker{}
p.balanceTracker.init(&mclock.System{}, 1)
}
connectedAt := time.Now()
defer func() {
p.balanceTracker = nil
pm.removePeer(p.id)
connectionTimer.UpdateSince(connectedAt)
}()
......@@ -400,6 +407,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer msg.Discard()
var deliverMsg *Msg
balanceTracker := p.balanceTracker
sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
p.responseLock.Lock()
......@@ -418,6 +426,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
realCost = pm.server.costTracker.realCost(servingTime, msg.Size, replySize)
if amount != 0 {
pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
balanceTracker.requestCost(realCost)
}
} else {
realCost = maxCost
......
......@@ -29,24 +29,24 @@ var (
connectionTimer = metrics.NewRegisteredTimer("les/connectionTime", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/requestServed", nil)
requestServedMeter = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015))
recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil)
clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil)
clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil)
clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil)
// clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil)
clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/requestServed", nil)
requestServedMeter = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil)
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil)
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015))
recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil)
clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil)
clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil)
clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil)
clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil)
clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil)
clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil)
)
// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
......
......@@ -21,6 +21,7 @@ import (
"fmt"
"math/big"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
......@@ -33,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -105,10 +107,11 @@ type peer struct {
updateTime mclock.AbsTime
frozen uint32 // 1 if client is in frozen state
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
fcParams flowcontrol.ServerParams
fcCosts requestCostTable
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
fcParams flowcontrol.ServerParams
fcCosts requestCostTable
balanceTracker *balanceTracker // set by clientPool.connect, used and removed by ProtocolManager.handle
trusted bool
onlyAnnounce bool
......@@ -122,12 +125,32 @@ func newPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2p.MsgR
rw: rw,
version: version,
network: network,
id: fmt.Sprintf("%x", p.ID().Bytes()),
id: peerIdToString(p.ID()),
trusted: trusted,
errCh: make(chan error, 1),
}
}
// peerIdToString converts enode.ID to a string form
func peerIdToString(id enode.ID) string {
return fmt.Sprintf("%x", id.Bytes())
}
// freeClientId returns a string identifier for the peer. Multiple peers with the
// same identifier can not be connected in free mode simultaneously.
func (p *peer) freeClientId() string {
if addr, ok := p.RemoteAddr().(*net.TCPAddr); ok {
if addr.IP.IsLoopback() {
// using peer id instead of loopback ip address allows multiple free
// connections from local machine to own server
return p.id
} else {
return addr.IP.String()
}
}
return p.id
}
// rejectUpdate returns true if a parameter update has to be rejected because
// the size and/or rate of updates exceed the capacity limitation
func (p *peer) rejectUpdate(size uint64) bool {
......
......@@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
......@@ -55,9 +56,9 @@ type LesServer struct {
thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode
maxPeers int
minCapacity, freeClientCap uint64
freeClientPool *freeClientPool
maxPeers int
minCapacity, maxCapacity, freeClientCap uint64
clientPool *clientPool
}
func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
......@@ -158,7 +159,7 @@ func (s *LesServer) startEventLoop() {
}
updateRecharge()
totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh)
s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
s.clientPool.setLimits(s.maxPeers, totalCapacity)
var maxFreePeers uint64
go func() {
......@@ -175,7 +176,7 @@ func (s *LesServer) startEventLoop() {
log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers)
}
maxFreePeers = newFreePeers
s.freeClientPool.setLimits(s.maxPeers, totalCapacity)
s.clientPool.setLimits(s.maxPeers, totalCapacity)
case <-s.protocolManager.quitSync:
s.protocolManager.wg.Done()
return
......@@ -205,14 +206,14 @@ func (s *LesServer) Start(srvr *p2p.Server) {
}
}
maxCapacity := s.freeClientCap * uint64(s.maxPeers)
if totalRecharge > maxCapacity {
maxCapacity = totalRecharge
s.maxCapacity = s.freeClientCap * uint64(s.maxPeers)
if totalRecharge > s.maxCapacity {
s.maxCapacity = totalRecharge
}
s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2)
s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) })
s.protocolManager.peers.notify(s.freeClientPool)
s.fcManager.SetCapacityLimits(s.freeClientCap, s.maxCapacity, s.freeClientCap*2)
s.clientPool = newClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id enode.ID) { go s.protocolManager.removePeer(peerIdToString(id)) })
s.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
s.protocolManager.peers.notify(s.clientPool)
s.startEventLoop()
s.protocolManager.Start(s.config.LightPeers)
if srvr.DiscV5 != nil {
......@@ -250,7 +251,7 @@ func (s *LesServer) Stop() {
go func() {
<-s.protocolManager.noMorePeers
}()
s.freeClientPool.stop()
s.clientPool.stop()
s.costTracker.stop()
s.protocolManager.Stop()
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment