Commit 62dd9833 authored by Daniel A. Nagy's avatar Daniel A. Nagy

Merge branch 'develop' of github.com:ethereum/go-ethereum into develop

parents 3a01e3e3 c8fc4ceb
...@@ -98,6 +98,10 @@ ...@@ -98,6 +98,10 @@
"Comment": "v0.1.0-3-g27c4092", "Comment": "v0.1.0-3-g27c4092",
"Rev": "27c40922c40b43fe04554d8223a402af3ea333f3" "Rev": "27c40922c40b43fe04554d8223a402af3ea333f3"
}, },
{
"ImportPath": "gopkg.in/karalabe/cookiejar.v2/collections/prque",
"Rev": "0b2e270613f5d7ba262a5749b9e32270131497a2"
},
{ {
"ImportPath": "gopkg.in/qml.v1/cdata", "ImportPath": "gopkg.in/qml.v1/cdata",
"Rev": "1116cb9cd8dee23f8d444ded354eb53122739f99" "Rev": "1116cb9cd8dee23f8d444ded354eb53122739f99"
......
// CookieJar - A contestant's algorithm toolbox
// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
//
// CookieJar is dual licensed: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The toolbox is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the CookieJar toolbox may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
package prque_test
import (
"fmt"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
// Insert some data into a priority queue and pop them out in prioritized order.
func Example_usage() {
// Define some data to push into the priority queue
prio := []float32{77.7, 22.2, 44.4, 55.5, 11.1, 88.8, 33.3, 99.9, 0.0, 66.6}
data := []string{"zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"}
// Create the priority queue and insert the prioritized data
pq := prque.New()
for i := 0; i < len(data); i++ {
pq.Push(data[i], prio[i])
}
// Pop out the data and print them
for !pq.Empty() {
val, prio := pq.Pop()
fmt.Printf("%.1f:%s ", prio, val)
}
// Output:
// 99.9:seven 88.8:five 77.7:zero 66.6:nine 55.5:three 44.4:two 33.3:six 22.2:one 11.1:four 0.0:eight
}
// CookieJar - A contestant's algorithm toolbox
// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
//
// CookieJar is dual licensed: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The toolbox is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the CookieJar toolbox may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
// Package prque implements a priority queue data structure supporting arbitrary
// value types and float priorities.
//
// The reasoning behind using floats for the priorities vs. ints or interfaces
// was larger flexibility without sacrificing too much performance or code
// complexity.
//
// If you would like to use a min-priority queue, simply negate the priorities.
//
// Internally the queue is based on the standard heap package working on a
// sortable version of the block based stack.
package prque
import (
"container/heap"
)
// Priority queue data structure.
type Prque struct {
cont *sstack
}
// Creates a new priority queue.
func New() *Prque {
return &Prque{newSstack()}
}
// Pushes a value with a given priority into the queue, expanding if necessary.
func (p *Prque) Push(data interface{}, priority float32) {
heap.Push(p.cont, &item{data, priority})
}
// Pops the value with the greates priority off the stack and returns it.
// Currently no shrinking is done.
func (p *Prque) Pop() (interface{}, float32) {
item := heap.Pop(p.cont).(*item)
return item.value, item.priority
}
// Pops only the item from the queue, dropping the associated priority value.
func (p *Prque) PopItem() interface{} {
return heap.Pop(p.cont).(*item).value
}
// Checks whether the priority queue is empty.
func (p *Prque) Empty() bool {
return p.cont.Len() == 0
}
// Returns the number of element in the priority queue.
func (p *Prque) Size() int {
return p.cont.Len()
}
// Clears the contents of the priority queue.
func (p *Prque) Reset() {
*p = *New()
}
// CookieJar - A contestant's algorithm toolbox
// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
//
// CookieJar is dual licensed: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The toolbox is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the CookieJar toolbox may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
package prque
import (
"math/rand"
"testing"
)
func TestPrque(t *testing.T) {
// Generate a batch of random data and a specific priority order
size := 16 * blockSize
prio := rand.Perm(size)
data := make([]int, size)
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New()
for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
queue.Push(data[i], float32(prio[i]))
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[float32]int)
for i := 0; i < size; i++ {
dict[float32(prio[i])] = data[i]
}
// Pop out the elements in priority order and verify them
prevPrio := float32(size + 1)
for !queue.Empty() {
val, prio := queue.Pop()
if prio > prevPrio {
t.Errorf("invalid priority order: %v after %v.", prio, prevPrio)
}
prevPrio = prio
if val != dict[prio] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio])
}
delete(dict, prio)
}
}
}
func TestReset(t *testing.T) {
// Generate a batch of random data and a specific priority order
size := 16 * blockSize
prio := rand.Perm(size)
data := make([]int, size)
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New()
for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
queue.Push(data[i], float32(prio[i]))
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[float32]int)
for i := 0; i < size; i++ {
dict[float32(prio[i])] = data[i]
}
// Pop out half the elements in priority order and verify them
prevPrio := float32(size + 1)
for i := 0; i < size/2; i++ {
val, prio := queue.Pop()
if prio > prevPrio {
t.Errorf("invalid priority order: %v after %v.", prio, prevPrio)
}
prevPrio = prio
if val != dict[prio] {
t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio])
}
delete(dict, prio)
}
// Reset and ensure it's empty
queue.Reset()
if !queue.Empty() {
t.Errorf("priority queue not empty after reset: %v", queue)
}
}
}
func BenchmarkPush(b *testing.B) {
// Create some initial data
data := make([]int, b.N)
prio := make([]float32, b.N)
for i := 0; i < len(data); i++ {
data[i] = rand.Int()
prio[i] = rand.Float32()
}
// Execute the benchmark
b.ResetTimer()
queue := New()
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
}
func BenchmarkPop(b *testing.B) {
// Create some initial data
data := make([]int, b.N)
prio := make([]float32, b.N)
for i := 0; i < len(data); i++ {
data[i] = rand.Int()
prio[i] = rand.Float32()
}
queue := New()
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
// Execute the benchmark
b.ResetTimer()
for !queue.Empty() {
queue.Pop()
}
}
// CookieJar - A contestant's algorithm toolbox
// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
//
// CookieJar is dual licensed: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The toolbox is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the CookieJar toolbox may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
package prque
// The size of a block of data
const blockSize = 4096
// A prioritized item in the sorted stack.
type item struct {
value interface{}
priority float32
}
// Internal sortable stack data structure. Implements the Push and Pop ops for
// the stack (heap) functionality and the Len, Less and Swap methods for the
// sortability requirements of the heaps.
type sstack struct {
size int
capacity int
offset int
blocks [][]*item
active []*item
}
// Creates a new, empty stack.
func newSstack() *sstack {
result := new(sstack)
result.active = make([]*item, blockSize)
result.blocks = [][]*item{result.active}
result.capacity = blockSize
return result
}
// Pushes a value onto the stack, expanding it if necessary. Required by
// heap.Interface.
func (s *sstack) Push(data interface{}) {
if s.size == s.capacity {
s.active = make([]*item, blockSize)
s.blocks = append(s.blocks, s.active)
s.capacity += blockSize
s.offset = 0
} else if s.offset == blockSize {
s.active = s.blocks[s.size/blockSize]
s.offset = 0
}
s.active[s.offset] = data.(*item)
s.offset++
s.size++
}
// Pops a value off the stack and returns it. Currently no shrinking is done.
// Required by heap.Interface.
func (s *sstack) Pop() (res interface{}) {
s.size--
s.offset--
if s.offset < 0 {
s.offset = blockSize - 1
s.active = s.blocks[s.size/blockSize]
}
res, s.active[s.offset] = s.active[s.offset], nil
return
}
// Returns the length of the stack. Required by sort.Interface.
func (s *sstack) Len() int {
return s.size
}
// Compares the priority of two elements of the stack (higher is first).
// Required by sort.Interface.
func (s *sstack) Less(i, j int) bool {
return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority
}
// Swaps two elements in the stack. Required by sort.Interface.
func (s *sstack) Swap(i, j int) {
ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io]
}
// Resets the stack, effectively clearing its contents.
func (s *sstack) Reset() {
*s = *newSstack()
}
// CookieJar - A contestant's algorithm toolbox
// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
//
// CookieJar is dual licensed: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The toolbox is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the CookieJar toolbox may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
package prque
import (
"math/rand"
"sort"
"testing"
)
func TestSstack(t *testing.T) {
// Create some initial data
size := 16 * blockSize
data := make([]*item, size)
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Float32()}
}
stack := newSstack()
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
for i := 0; i < size; i++ {
stack.Push(data[i])
if i%2 == 0 {
secs = append(secs, stack.Pop().(*item))
}
}
rest := []*item{}
for stack.Len() > 0 {
rest = append(rest, stack.Pop().(*item))
}
// Make sure the contents of the resulting slices are ok
for i := 0; i < size; i++ {
if i%2 == 0 && data[i] != secs[i/2] {
t.Errorf("push/pop mismatch: have %v, want %v.", secs[i/2], data[i])
}
if i%2 == 1 && data[i] != rest[len(rest)-i/2-1] {
t.Errorf("push/pop mismatch: have %v, want %v.", rest[len(rest)-i/2-1], data[i])
}
}
}
}
func TestSstackSort(t *testing.T) {
// Create some initial data
size := 16 * blockSize
data := make([]*item, size)
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), float32(i)}
}
// Push all the data into the stack
stack := newSstack()
for _, val := range data {
stack.Push(val)
}
// Sort and pop the stack contents (should reverse the order)
sort.Sort(stack)
for _, val := range data {
out := stack.Pop()
if out != val {
t.Errorf("push/pop mismatch after sort: have %v, want %v.", out, val)
}
}
}
func TestSstackReset(t *testing.T) {
// Create some initial data
size := 16 * blockSize
data := make([]*item, size)
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Float32()}
}
stack := newSstack()
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
for i := 0; i < size; i++ {
stack.Push(data[i])
if i%2 == 0 {
secs = append(secs, stack.Pop().(*item))
}
}
// Reset and verify both pulled and stack contents
stack.Reset()
if stack.Len() != 0 {
t.Errorf("stack not empty after reset: %v", stack)
}
for i := 0; i < size; i++ {
if i%2 == 0 && data[i] != secs[i/2] {
t.Errorf("push/pop mismatch: have %v, want %v.", secs[i/2], data[i])
}
}
}
}
...@@ -51,7 +51,7 @@ import _ "net/http/pprof" ...@@ -51,7 +51,7 @@ import _ "net/http/pprof"
const ( const (
ClientIdentifier = "Geth" ClientIdentifier = "Geth"
Version = "0.9.16" Version = "0.9.17"
) )
var ( var (
...@@ -242,6 +242,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso ...@@ -242,6 +242,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils.JSpathFlag, utils.JSpathFlag,
utils.ListenPortFlag, utils.ListenPortFlag,
utils.MaxPeersFlag, utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
utils.EtherbaseFlag, utils.EtherbaseFlag,
utils.MinerThreadsFlag, utils.MinerThreadsFlag,
utils.MiningEnabledFlag, utils.MiningEnabledFlag,
......
...@@ -75,6 +75,7 @@ func init() { ...@@ -75,6 +75,7 @@ func init() {
utils.LogFileFlag, utils.LogFileFlag,
utils.LogLevelFlag, utils.LogLevelFlag,
utils.MaxPeersFlag, utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
utils.MinerThreadsFlag, utils.MinerThreadsFlag,
utils.NATFlag, utils.NATFlag,
utils.NodeKeyFileFlag, utils.NodeKeyFileFlag,
......
...@@ -195,7 +195,12 @@ var ( ...@@ -195,7 +195,12 @@ var (
MaxPeersFlag = cli.IntFlag{ MaxPeersFlag = cli.IntFlag{
Name: "maxpeers", Name: "maxpeers",
Usage: "Maximum number of network peers (network disabled if set to 0)", Usage: "Maximum number of network peers (network disabled if set to 0)",
Value: 16, Value: 25,
}
MaxPendingPeersFlag = cli.IntFlag{
Name: "maxpendpeers",
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
Value: 0,
} }
ListenPortFlag = cli.IntFlag{ ListenPortFlag = cli.IntFlag{
Name: "port", Name: "port",
...@@ -292,6 +297,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config { ...@@ -292,6 +297,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
AccountManager: GetAccountManager(ctx), AccountManager: GetAccountManager(ctx),
VmDebug: ctx.GlobalBool(VMDebugFlag.Name), VmDebug: ctx.GlobalBool(VMDebugFlag.Name),
MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name),
MaxPendingPeers: ctx.GlobalInt(MaxPendingPeersFlag.Name),
Port: ctx.GlobalString(ListenPortFlag.Name), Port: ctx.GlobalString(ListenPortFlag.Name),
NAT: GetNAT(ctx), NAT: GetNAT(ctx),
NatSpec: ctx.GlobalBool(NatspecEnabledFlag.Name), NatSpec: ctx.GlobalBool(NatspecEnabledFlag.Name),
......
...@@ -60,8 +60,9 @@ type Config struct { ...@@ -60,8 +60,9 @@ type Config struct {
VmDebug bool VmDebug bool
NatSpec bool NatSpec bool
MaxPeers int MaxPeers int
Port string MaxPendingPeers int
Port string
// Space-separated list of discovery node URLs // Space-separated list of discovery node URLs
BootNodes string BootNodes string
...@@ -280,16 +281,17 @@ func New(config *Config) (*Ethereum, error) { ...@@ -280,16 +281,17 @@ func New(config *Config) (*Ethereum, error) {
protocols = append(protocols, eth.whisper.Protocol()) protocols = append(protocols, eth.whisper.Protocol())
} }
eth.net = &p2p.Server{ eth.net = &p2p.Server{
PrivateKey: netprv, PrivateKey: netprv,
Name: config.Name, Name: config.Name,
MaxPeers: config.MaxPeers, MaxPeers: config.MaxPeers,
Protocols: protocols, MaxPendingPeers: config.MaxPendingPeers,
NAT: config.NAT, Protocols: protocols,
NoDial: !config.Dial, NAT: config.NAT,
BootstrapNodes: config.parseBootNodes(), NoDial: !config.Dial,
StaticNodes: config.parseNodes(staticNodes), BootstrapNodes: config.parseBootNodes(),
TrustedNodes: config.parseNodes(trustedNodes), StaticNodes: config.parseNodes(staticNodes),
NodeDatabase: nodeDb, TrustedNodes: config.parseNodes(trustedNodes),
NodeDatabase: nodeDb,
} }
if len(config.Port) > 0 { if len(config.Port) > 0 {
eth.net.ListenAddr = ":" + config.Port eth.net.ListenAddr = ":" + config.Port
......
This diff is collapsed.
...@@ -128,7 +128,7 @@ func TestDownload(t *testing.T) { ...@@ -128,7 +128,7 @@ func TestDownload(t *testing.T) {
t.Error("download error", err) t.Error("download error", err)
} }
inqueue := len(tester.downloader.queue.blocks) inqueue := len(tester.downloader.queue.blockCache)
if inqueue != targetBlocks { if inqueue != targetBlocks {
t.Error("expected", targetBlocks, "have", inqueue) t.Error("expected", targetBlocks, "have", inqueue)
} }
...@@ -151,7 +151,7 @@ func TestMissing(t *testing.T) { ...@@ -151,7 +151,7 @@ func TestMissing(t *testing.T) {
t.Error("download error", err) t.Error("download error", err)
} }
inqueue := len(tester.downloader.queue.blocks) inqueue := len(tester.downloader.queue.blockCache)
if inqueue != targetBlocks { if inqueue != targetBlocks {
t.Error("expected", targetBlocks, "have", inqueue) t.Error("expected", targetBlocks, "have", inqueue)
} }
...@@ -181,3 +181,51 @@ func TestTaking(t *testing.T) { ...@@ -181,3 +181,51 @@ func TestTaking(t *testing.T) {
t.Error("expected to take 1000, got", len(bs1)) t.Error("expected to take 1000, got", len(bs1))
} }
} }
func TestThrottling(t *testing.T) {
minDesiredPeerCount = 4
blockTtl = 1 * time.Second
targetBlocks := 4 * blockCacheLimit
hashes := createHashes(0, targetBlocks)
blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks)
tester.newPeer("peer1", big.NewInt(10000), hashes[0])
tester.newPeer("peer2", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
// Concurrently download and take the blocks
errc := make(chan error, 1)
go func() {
errc <- tester.sync("peer1", hashes[0])
}()
done := make(chan struct{})
took := []*types.Block{}
go func() {
for {
select {
case <-done:
took = append(took, tester.downloader.TakeBlocks()...)
done <- struct{}{}
return
default:
took = append(took, tester.downloader.TakeBlocks()...)
}
}
}()
// Synchronise the two threads and verify
err := <-errc
done <- struct{}{}
<-done
if err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
if len(took) != targetBlocks {
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
}
}
...@@ -78,7 +78,7 @@ func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blo ...@@ -78,7 +78,7 @@ func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blo
} }
// fetch a chunk using the peer // fetch a chunk using the peer
func (p *peer) fetch(chunk *chunk) error { func (p *peer) fetch(request *fetchRequest) error {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
...@@ -88,13 +88,12 @@ func (p *peer) fetch(chunk *chunk) error { ...@@ -88,13 +88,12 @@ func (p *peer) fetch(chunk *chunk) error {
// set working state // set working state
p.state = workingState p.state = workingState
// convert the set to a fetchable slice
hashes, i := make([]common.Hash, chunk.hashes.Size()), 0 // Convert the hash set to a fetchable slice
chunk.hashes.Each(func(v interface{}) bool { hashes := make([]common.Hash, 0, len(request.Hashes))
hashes[i] = v.(common.Hash) for hash, _ := range request.Hashes {
i++ hashes = append(hashes, hash)
return true }
})
p.getBlocks(hashes) p.getBlocks(hashes)
return nil return nil
......
This diff is collapsed.
...@@ -32,31 +32,30 @@ func createBlocksFromHashSet(hashes *set.Set) []*types.Block { ...@@ -32,31 +32,30 @@ func createBlocksFromHashSet(hashes *set.Set) []*types.Block {
} }
func TestChunking(t *testing.T) { func TestChunking(t *testing.T) {
queue := newqueue() queue := newQueue()
peer1 := newPeer("peer1", common.Hash{}, nil, nil) peer1 := newPeer("peer1", common.Hash{}, nil, nil)
peer2 := newPeer("peer2", common.Hash{}, nil, nil) peer2 := newPeer("peer2", common.Hash{}, nil, nil)
// 99 + 1 (1 == known genesis hash) // 99 + 1 (1 == known genesis hash)
hashes := createHashes(0, 99) hashes := createHashes(0, 99)
hashSet := createHashSet(hashes) queue.Insert(hashes)
queue.put(hashSet)
chunk1 := queue.get(peer1, 99) chunk1 := queue.Reserve(peer1, 99)
if chunk1 == nil { if chunk1 == nil {
t.Errorf("chunk1 is nil") t.Errorf("chunk1 is nil")
t.FailNow() t.FailNow()
} }
chunk2 := queue.get(peer2, 99) chunk2 := queue.Reserve(peer2, 99)
if chunk2 == nil { if chunk2 == nil {
t.Errorf("chunk2 is nil") t.Errorf("chunk2 is nil")
t.FailNow() t.FailNow()
} }
if chunk1.hashes.Size() != 99 { if len(chunk1.Hashes) != 99 {
t.Error("expected chunk1 hashes to be 99, got", chunk1.hashes.Size()) t.Error("expected chunk1 hashes to be 99, got", len(chunk1.Hashes))
} }
if chunk2.hashes.Size() != 1 { if len(chunk2.Hashes) != 1 {
t.Error("expected chunk1 hashes to be 1, got", chunk2.hashes.Size()) t.Error("expected chunk1 hashes to be 1, got", len(chunk2.Hashes))
} }
} }
...@@ -19,9 +19,9 @@ import ( ...@@ -19,9 +19,9 @@ import (
) )
const ( const (
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
blockProcTimer = 500 * time.Millisecond blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
minDesiredPeerCount = 5 // Amount of peers desired to start syncing minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256 blockProcAmount = 256
) )
...@@ -307,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { ...@@ -307,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Attempt to insert the newly received by checking if the parent exists. // Attempt to insert the newly received by checking if the parent exists.
// if the parent exists we process the block and propagate to our peers // if the parent exists we process the block and propagate to our peers
// otherwise synchronise with the peer // otherwise synchronize with the peer
if self.chainman.HasBlock(request.Block.ParentHash()) { if self.chainman.HasBlock(request.Block.ParentHash()) {
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
......
...@@ -12,10 +12,8 @@ import ( ...@@ -12,10 +12,8 @@ import (
// Sync contains all synchronisation code for the eth protocol // Sync contains all synchronisation code for the eth protocol
func (pm *ProtocolManager) update() { func (pm *ProtocolManager) update() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount` forceSync := time.Tick(forceSyncCycle)
itimer := time.NewTimer(peerCountTimeout) blockProc := time.Tick(blockProcCycle)
// btimer is used for picking of blocks from the downloader
btimer := time.Tick(blockProcTimer)
for { for {
select { select {
...@@ -24,27 +22,22 @@ func (pm *ProtocolManager) update() { ...@@ -24,27 +22,22 @@ func (pm *ProtocolManager) update() {
if len(pm.peers) < minDesiredPeerCount { if len(pm.peers) < minDesiredPeerCount {
break break
} }
// Find the best peer and synchronise with it
// Find the best peer
peer := getBestPeer(pm.peers) peer := getBestPeer(pm.peers)
if peer == nil { if peer == nil {
glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available") glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available")
} }
itimer.Stop()
go pm.synchronise(peer) go pm.synchronise(peer)
case <-itimer.C:
// The timer will make sure that the downloader keeps an active state case <-forceSync:
// in which it attempts to always check the network for highest td peers // Force a sync even if not enough peers are present
// Either select the peer or restart the timer if no peers could
// be selected.
if peer := getBestPeer(pm.peers); peer != nil { if peer := getBestPeer(pm.peers); peer != nil {
go pm.synchronise(peer) go pm.synchronise(peer)
} else {
itimer.Reset(5 * time.Second)
} }
case <-btimer: case <-blockProc:
// Try to pull some blocks from the downloaded
go pm.processBlocks() go pm.processBlocks()
case <-pm.quitSync: case <-pm.quitSync:
return return
} }
...@@ -59,12 +52,11 @@ func (pm *ProtocolManager) processBlocks() error { ...@@ -59,12 +52,11 @@ func (pm *ProtocolManager) processBlocks() error {
pm.wg.Add(1) pm.wg.Add(1)
defer pm.wg.Done() defer pm.wg.Done()
// Take a batch of blocks (will return nil if a previous batch has not reached the chain yet)
blocks := pm.downloader.TakeBlocks() blocks := pm.downloader.TakeBlocks()
if len(blocks) == 0 { if len(blocks) == 0 {
return nil return nil
} }
defer pm.downloader.Done()
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
for len(blocks) != 0 && !pm.quit { for len(blocks) != 0 && !pm.quit {
...@@ -83,26 +75,28 @@ func (pm *ProtocolManager) synchronise(peer *peer) { ...@@ -83,26 +75,28 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if peer.td.Cmp(pm.chainman.Td()) <= 0 { if peer.td.Cmp(pm.chainman.Td()) <= 0 {
return return
} }
// Check downloader if it's busy so it doesn't show the sync message
// for every attempty
if pm.downloader.IsBusy() {
return
}
// FIXME if we have the hash in our chain and the TD of the peer is // FIXME if we have the hash in our chain and the TD of the peer is
// much higher than ours, something is wrong with us or the peer. // much higher than ours, something is wrong with us or the peer.
// Check if the hash is on our own chain // Check if the hash is on our own chain
if pm.chainman.HasBlock(peer.recentHash) { if pm.chainman.HasBlock(peer.recentHash) {
return return
} }
// Get the hashes from the peer (synchronously) // Get the hashes from the peer (synchronously)
glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash)
err := pm.downloader.Synchronise(peer.id, peer.recentHash) err := pm.downloader.Synchronise(peer.id, peer.recentHash)
if err != nil && err == downloader.ErrBadPeer { switch err {
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action") case nil:
glog.V(logger.Debug).Infof("Synchronisation completed")
case downloader.ErrBusy:
glog.V(logger.Debug).Infof("Synchronisation already in progress")
case downloader.ErrTimeout:
glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id)
pm.removePeer(peer) pm.removePeer(peer)
} else if err != nil {
// handle error default:
glog.V(logger.Detail).Infoln("error downloading:", err) glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
} }
} }
...@@ -286,7 +286,7 @@ func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value { ...@@ -286,7 +286,7 @@ func (self *JSRE) loadScript(call otto.FunctionCall) otto.Value {
// uses the "prettyPrint" JS function to format a value // uses the "prettyPrint" JS function to format a value
func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) { func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) {
var method otto.Value var method otto.Value
v, err = self.vm.ToValue(v) v, err = self.ToValue(v)
if err != nil { if err != nil {
return return
} }
...@@ -297,8 +297,23 @@ func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) { ...@@ -297,8 +297,23 @@ func (self *JSRE) PrettyPrint(v interface{}) (val otto.Value, err error) {
return method.Call(method, v) return method.Call(method, v)
} }
// creates an otto value from a go type // creates an otto value from a go type (serialized version)
func (self *JSRE) ToValue(v interface{}) (otto.Value, error) {
done := make(chan bool)
req := &evalReq{
fn: func(res *evalResult) {
res.result, res.err = self.vm.ToValue(v)
},
done: done,
}
self.evalQueue <- req
<-done
return req.res.result, req.res.err
}
// creates an otto value from a go type (non-serialized version)
func (self *JSRE) ToVal(v interface{}) otto.Value { func (self *JSRE) ToVal(v interface{}) otto.Value {
result, err := self.vm.ToValue(v) result, err := self.vm.ToValue(v)
if err != nil { if err != nil {
fmt.Println("Value unknown:", err) fmt.Println("Value unknown:", err)
......
...@@ -65,26 +65,26 @@ type protoHandshake struct { ...@@ -65,26 +65,26 @@ type protoHandshake struct {
ID discover.NodeID ID discover.NodeID
} }
// setupConn starts a protocol session on the given connection. // setupConn starts a protocol session on the given connection. It
// It runs the encryption handshake and the protocol handshake. // runs the encryption handshake and the protocol handshake. If dial
// If dial is non-nil, the connection the local node is the initiator. // is non-nil, the connection the local node is the initiator. If
// If atcap is true, the connection will be disconnected with DiscTooManyPeers // keepconn returns false, the connection will be disconnected with
// after the key exchange. // DiscTooManyPeers after the key exchange.
func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) { func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
if dial == nil { if dial == nil {
return setupInboundConn(fd, prv, our, atcap, trusted) return setupInboundConn(fd, prv, our, keepconn)
} else { } else {
return setupOutboundConn(fd, prv, our, dial, atcap, trusted) return setupOutboundConn(fd, prv, our, dial, keepconn)
} }
} }
func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) { func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, keepconn func(discover.NodeID) bool) (*conn, error) {
secrets, err := receiverEncHandshake(fd, prv, nil) secrets, err := receiverEncHandshake(fd, prv, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("encryption handshake failed: %v", err) return nil, fmt.Errorf("encryption handshake failed: %v", err)
} }
rw := newRlpxFrameRW(fd, secrets) rw := newRlpxFrameRW(fd, secrets)
if atcap && !trusted[secrets.RemoteID] { if !keepconn(secrets.RemoteID) {
SendItems(rw, discMsg, DiscTooManyPeers) SendItems(rw, discMsg, DiscTooManyPeers)
return nil, errors.New("we have too many peers") return nil, errors.New("we have too many peers")
} }
...@@ -99,13 +99,13 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, a ...@@ -99,13 +99,13 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, a
return &conn{rw, rhs}, nil return &conn{rw, rhs}, nil
} }
func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) { func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
secrets, err := initiatorEncHandshake(fd, prv, dial.ID, nil) secrets, err := initiatorEncHandshake(fd, prv, dial.ID, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("encryption handshake failed: %v", err) return nil, fmt.Errorf("encryption handshake failed: %v", err)
} }
rw := newRlpxFrameRW(fd, secrets) rw := newRlpxFrameRW(fd, secrets)
if atcap && !trusted[secrets.RemoteID] { if !keepconn(secrets.RemoteID) {
SendItems(rw, discMsg, DiscTooManyPeers) SendItems(rw, discMsg, DiscTooManyPeers)
return nil, errors.New("we have too many peers") return nil, errors.New("we have too many peers")
} }
......
...@@ -141,9 +141,10 @@ func TestSetupConn(t *testing.T) { ...@@ -141,9 +141,10 @@ func TestSetupConn(t *testing.T) {
fd0, fd1 := net.Pipe() fd0, fd1 := net.Pipe()
done := make(chan struct{}) done := make(chan struct{})
keepalways := func(discover.NodeID) bool { return true }
go func() { go func() {
defer close(done) defer close(done)
conn0, err := setupConn(fd0, prv0, hs0, node1, false, nil) conn0, err := setupConn(fd0, prv0, hs0, node1, keepalways)
if err != nil { if err != nil {
t.Errorf("outbound side error: %v", err) t.Errorf("outbound side error: %v", err)
return return
...@@ -156,7 +157,7 @@ func TestSetupConn(t *testing.T) { ...@@ -156,7 +157,7 @@ func TestSetupConn(t *testing.T) {
} }
}() }()
conn1, err := setupConn(fd1, prv1, hs1, nil, false, nil) conn1, err := setupConn(fd1, prv1, hs1, nil, keepalways)
if err != nil { if err != nil {
t.Fatalf("inbound side error: %v", err) t.Fatalf("inbound side error: %v", err)
} }
......
...@@ -211,6 +211,18 @@ func (p *Peer) handle(msg Msg) error { ...@@ -211,6 +211,18 @@ func (p *Peer) handle(msg Msg) error {
return nil return nil
} }
func countMatchingProtocols(protocols []Protocol, caps []Cap) int {
n := 0
for _, cap := range caps {
for _, proto := range protocols {
if proto.Name == cap.Name && proto.Version == cap.Version {
n++
}
}
}
return n
}
// matchProtocols creates structures for matching named subprotocols. // matchProtocols creates structures for matching named subprotocols.
func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
sort.Sort(capsByName(caps)) sort.Sort(capsByName(caps))
......
...@@ -22,10 +22,11 @@ const ( ...@@ -22,10 +22,11 @@ const (
refreshPeersInterval = 30 * time.Second refreshPeersInterval = 30 * time.Second
staticPeerCheckInterval = 15 * time.Second staticPeerCheckInterval = 15 * time.Second
// This is the maximum number of inbound connection // Maximum number of concurrently handshaking inbound connections.
// that are allowed to linger between 'accepted' and maxAcceptConns = 10
// 'added as peer'.
maxAcceptConns = 50 // Maximum number of concurrently dialing outbound connections.
maxDialingConns = 10
// total timeout for encryption handshake and protocol // total timeout for encryption handshake and protocol
// handshake in both directions. // handshake in both directions.
...@@ -52,6 +53,11 @@ type Server struct { ...@@ -52,6 +53,11 @@ type Server struct {
// connected. It must be greater than zero. // connected. It must be greater than zero.
MaxPeers int MaxPeers int
// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.
MaxPendingPeers int
// Name sets the node name of this server. // Name sets the node name of this server.
// Use common.MakeName to create a name that follows existing conventions. // Use common.MakeName to create a name that follows existing conventions.
Name string Name string
...@@ -120,7 +126,7 @@ type Server struct { ...@@ -120,7 +126,7 @@ type Server struct {
peerWG sync.WaitGroup // active peer goroutines peerWG sync.WaitGroup // active peer goroutines
} }
type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool, map[discover.NodeID]bool) (*conn, error) type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, func(discover.NodeID) bool) (*conn, error)
type newPeerHook func(*Peer) type newPeerHook func(*Peer)
// Peers returns all connected peers. // Peers returns all connected peers.
...@@ -331,8 +337,12 @@ func (srv *Server) listenLoop() { ...@@ -331,8 +337,12 @@ func (srv *Server) listenLoop() {
// This channel acts as a semaphore limiting // This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake. // active inbound connections that are lingering pre-handshake.
// If all slots are taken, no further connections are accepted. // If all slots are taken, no further connections are accepted.
slots := make(chan struct{}, maxAcceptConns) tokens := maxAcceptConns
for i := 0; i < maxAcceptConns; i++ { if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{} slots <- struct{}{}
} }
...@@ -401,7 +411,15 @@ func (srv *Server) dialLoop() { ...@@ -401,7 +411,15 @@ func (srv *Server) dialLoop() {
defer srv.loopWG.Done() defer srv.loopWG.Done()
defer refresh.Stop() defer refresh.Stop()
// TODO: maybe limit number of active dials // Limit the number of concurrent dials
tokens := maxDialingConns
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
dial := func(dest *discover.Node) { dial := func(dest *discover.Node) {
// Don't dial nodes that would fail the checks in addPeer. // Don't dial nodes that would fail the checks in addPeer.
// This is important because the connection handshake is a lot // This is important because the connection handshake is a lot
...@@ -413,11 +431,14 @@ func (srv *Server) dialLoop() { ...@@ -413,11 +431,14 @@ func (srv *Server) dialLoop() {
if !ok || dialing[dest.ID] { if !ok || dialing[dest.ID] {
return return
} }
// Request a dial slot to prevent CPU exhaustion
<-slots
dialing[dest.ID] = true dialing[dest.ID] = true
srv.peerWG.Add(1) srv.peerWG.Add(1)
go func() { go func() {
srv.dialNode(dest) srv.dialNode(dest)
slots <- struct{}{}
dialed <- dest dialed <- dest
}() }()
} }
...@@ -485,17 +506,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { ...@@ -485,17 +506,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
// the callers of startPeer added the peer to the wait group already. // the callers of startPeer added the peer to the wait group already.
fd.SetDeadline(time.Now().Add(handshakeTimeout)) fd.SetDeadline(time.Now().Add(handshakeTimeout))
// Check capacity, but override for static nodes conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, srv.keepconn)
srv.lock.RLock()
atcap := len(srv.peers) == srv.MaxPeers
if dest != nil {
if _, ok := srv.staticNodes[dest.ID]; ok {
atcap = false
}
}
srv.lock.RUnlock()
conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap, srv.trustedNodes)
if err != nil { if err != nil {
fd.Close() fd.Close()
glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
...@@ -507,7 +518,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { ...@@ -507,7 +518,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout, conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
} }
p := newPeer(fd, conn, srv.Protocols) p := newPeer(fd, conn, srv.Protocols)
if ok, reason := srv.addPeer(conn.ID, p); !ok { if ok, reason := srv.addPeer(conn, p); !ok {
glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason) glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
p.politeDisconnect(reason) p.politeDisconnect(reason)
srv.peerWG.Done() srv.peerWG.Done()
...@@ -518,6 +529,21 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { ...@@ -518,6 +529,21 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
go srv.runPeer(p) go srv.runPeer(p)
} }
// preflight checks whether a connection should be kept. it runs
// after the encryption handshake, as soon as the remote identity is
// known.
func (srv *Server) keepconn(id discover.NodeID) bool {
srv.lock.RLock()
defer srv.lock.RUnlock()
if _, ok := srv.staticNodes[id]; ok {
return true // static nodes are always allowed
}
if _, ok := srv.trustedNodes[id]; ok {
return true // trusted nodes are always allowed
}
return len(srv.peers) < srv.MaxPeers
}
func (srv *Server) runPeer(p *Peer) { func (srv *Server) runPeer(p *Peer) {
glog.V(logger.Debug).Infof("Added %v\n", p) glog.V(logger.Debug).Infof("Added %v\n", p)
srvjslog.LogJson(&logger.P2PConnected{ srvjslog.LogJson(&logger.P2PConnected{
...@@ -538,13 +564,18 @@ func (srv *Server) runPeer(p *Peer) { ...@@ -538,13 +564,18 @@ func (srv *Server) runPeer(p *Peer) {
}) })
} }
func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { func (srv *Server) addPeer(conn *conn, p *Peer) (bool, DiscReason) {
// drop connections with no matching protocols.
if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, conn.protoHandshake.Caps) == 0 {
return false, DiscUselessPeer
}
// add the peer if it passes the other checks.
srv.lock.Lock() srv.lock.Lock()
defer srv.lock.Unlock() defer srv.lock.Unlock()
if ok, reason := srv.checkPeer(id); !ok { if ok, reason := srv.checkPeer(conn.ID); !ok {
return false, reason return false, reason
} }
srv.peers[id] = p srv.peers[conn.ID] = p
return true, 0 return true, 0
} }
......
...@@ -22,8 +22,11 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server { ...@@ -22,8 +22,11 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
ListenAddr: "127.0.0.1:0", ListenAddr: "127.0.0.1:0",
PrivateKey: newkey(), PrivateKey: newkey(),
newPeerHook: pf, newPeerHook: pf,
setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) { setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
id := randomID() id := randomID()
if !keepconn(id) {
return nil, DiscAlreadyConnected
}
rw := newRlpxFrameRW(fd, secrets{ rw := newRlpxFrameRW(fd, secrets{
MAC: zero16, MAC: zero16,
AES: zero16, AES: zero16,
...@@ -200,7 +203,7 @@ func TestServerDisconnectAtCap(t *testing.T) { ...@@ -200,7 +203,7 @@ func TestServerDisconnectAtCap(t *testing.T) {
// Run the handshakes just like a real peer would. // Run the handshakes just like a real peer would.
key := newkey() key := newkey()
hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
_, err = setupConn(conn, key, hs, srv.Self(), false, srv.trustedNodes) _, err = setupConn(conn, key, hs, srv.Self(), keepalways)
if i == nconns-1 { if i == nconns-1 {
// When handling the last connection, the server should // When handling the last connection, the server should
// disconnect immediately instead of running the protocol // disconnect immediately instead of running the protocol
...@@ -250,7 +253,7 @@ func TestServerStaticPeers(t *testing.T) { ...@@ -250,7 +253,7 @@ func TestServerStaticPeers(t *testing.T) {
// Run the handshakes just like a real peer would, and wait for completion // Run the handshakes just like a real peer would, and wait for completion
key := newkey() key := newkey()
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
if _, err = setupConn(conn, key, shake, server.Self(), false, server.trustedNodes); err != nil { if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
t.Fatalf("conn %d: unexpected error: %v", i, err) t.Fatalf("conn %d: unexpected error: %v", i, err)
} }
<-started <-started
...@@ -344,7 +347,7 @@ func TestServerTrustedPeers(t *testing.T) { ...@@ -344,7 +347,7 @@ func TestServerTrustedPeers(t *testing.T) {
// Run the handshakes just like a real peer would, and wait for completion // Run the handshakes just like a real peer would, and wait for completion
key := newkey() key := newkey()
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
if _, err = setupConn(conn, key, shake, server.Self(), false, server.trustedNodes); err != nil { if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
t.Fatalf("conn %d: unexpected error: %v", i, err) t.Fatalf("conn %d: unexpected error: %v", i, err)
} }
<-started <-started
...@@ -357,7 +360,7 @@ func TestServerTrustedPeers(t *testing.T) { ...@@ -357,7 +360,7 @@ func TestServerTrustedPeers(t *testing.T) {
defer conn.Close() defer conn.Close()
shake := &protoHandshake{Version: baseProtocolVersion, ID: trusted.ID} shake := &protoHandshake{Version: baseProtocolVersion, ID: trusted.ID}
if _, err = setupConn(conn, key, shake, server.Self(), false, server.trustedNodes); err != nil { if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
t.Fatalf("trusted node: unexpected error: %v", err) t.Fatalf("trusted node: unexpected error: %v", err)
} }
select { select {
...@@ -369,6 +372,136 @@ func TestServerTrustedPeers(t *testing.T) { ...@@ -369,6 +372,136 @@ func TestServerTrustedPeers(t *testing.T) {
} }
} }
// Tests that a failed dial will temporarily throttle a peer.
func TestServerMaxPendingDials(t *testing.T) {
defer testlog(t).detach()
// Start a simple test server
server := &Server{
ListenAddr: "127.0.0.1:0",
PrivateKey: newkey(),
MaxPeers: 10,
MaxPendingPeers: 1,
}
if err := server.Start(); err != nil {
t.Fatal("failed to start test server: %v", err)
}
defer server.Stop()
// Simulate two separate remote peers
peers := make(chan *discover.Node, 2)
conns := make(chan net.Conn, 2)
for i := 0; i < 2; i++ {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listener %d: failed to setup: %v", i, err)
}
defer listener.Close()
addr := listener.Addr().(*net.TCPAddr)
peers <- &discover.Node{
ID: discover.PubkeyID(&newkey().PublicKey),
IP: addr.IP,
TCP: uint16(addr.Port),
}
go func() {
conn, err := listener.Accept()
if err == nil {
conns <- conn
}
}()
}
// Request a dial for both peers
go func() {
for i := 0; i < 2; i++ {
server.staticDial <- <-peers // hack piggybacking the static implementation
}
}()
// Make sure only one outbound connection goes through
var conn net.Conn
select {
case conn = <-conns:
case <-time.After(100 * time.Millisecond):
t.Fatalf("first dial timeout")
}
select {
case conn = <-conns:
t.Fatalf("second dial completed prematurely")
case <-time.After(100 * time.Millisecond):
}
// Finish the first dial, check the second
conn.Close()
select {
case conn = <-conns:
conn.Close()
case <-time.After(100 * time.Millisecond):
t.Fatalf("second dial timeout")
}
}
func TestServerMaxPendingAccepts(t *testing.T) {
defer testlog(t).detach()
// Start a test server and a peer sink for synchronization
started := make(chan *Peer)
server := &Server{
ListenAddr: "127.0.0.1:0",
PrivateKey: newkey(),
MaxPeers: 10,
MaxPendingPeers: 1,
NoDial: true,
newPeerHook: func(p *Peer) { started <- p },
}
if err := server.Start(); err != nil {
t.Fatal("failed to start test server: %v", err)
}
defer server.Stop()
// Try and connect to the server on multiple threads concurrently
conns := make([]net.Conn, 2)
for i := 0; i < 2; i++ {
dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
conn, err := dialer.Dial("tcp", server.ListenAddr)
if err != nil {
t.Fatalf("failed to dial server: %v", err)
}
conns[i] = conn
}
// Check that a handshake on the second doesn't pass
go func() {
key := newkey()
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
if _, err := setupConn(conns[1], key, shake, server.Self(), keepalways); err != nil {
t.Fatalf("failed to run handshake: %v", err)
}
}()
select {
case <-started:
t.Fatalf("handshake on second connection accepted")
case <-time.After(time.Second):
}
// Shake on first, check that both go through
go func() {
key := newkey()
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
if _, err := setupConn(conns[0], key, shake, server.Self(), keepalways); err != nil {
t.Fatalf("failed to run handshake: %v", err)
}
}()
for i := 0; i < 2; i++ {
select {
case <-started:
case <-time.After(time.Second):
t.Fatalf("peer %d: handshake timeout", i)
}
}
}
func newkey() *ecdsa.PrivateKey { func newkey() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey() key, err := crypto.GenerateKey()
if err != nil { if err != nil {
...@@ -383,3 +516,7 @@ func randomID() (id discover.NodeID) { ...@@ -383,3 +516,7 @@ func randomID() (id discover.NodeID) {
} }
return id return id
} }
func keepalways(id discover.NodeID) bool {
return true
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment