Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
0cd72369
Commit
0cd72369
authored
Jun 03, 2015
by
Jeffrey Wilcke
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1176 from karalabe/congestion-control
eth/downloader: add a basic block download congestion control
parents
02f785af
3ec159ab
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
72 additions
and
55 deletions
+72
-55
downloader.go
eth/downloader/downloader.go
+8
-6
downloader_test.go
eth/downloader/downloader_test.go
+11
-11
peer.go
eth/downloader/peer.go
+45
-5
queue.go
eth/downloader/queue.go
+8
-2
queue_test.go
eth/downloader/queue_test.go
+0
-31
No files found.
eth/downloader/downloader.go
View file @
0cd72369
...
@@ -27,9 +27,10 @@ const (
...
@@ -27,9 +27,10 @@ const (
)
)
var
(
var
(
blockTTL
=
5
*
time
.
Second
// Time it takes for a block request to time out
blockSoftTTL
=
3
*
time
.
Second
// Request completion threshold for increasing or decreasing a peer's bandwidth
crossCheckCycle
=
time
.
Second
// Period after which to check for expired cross checks
blockHardTTL
=
3
*
blockSoftTTL
// Maximum time allowance before a block request is considered expired
minDesiredPeerCount
=
5
// Amount of peers desired to start syncing
crossCheckCycle
=
time
.
Second
// Period after which to check for expired cross checks
minDesiredPeerCount
=
5
// Amount of peers desired to start syncing
)
)
var
(
var
(
...
@@ -324,7 +325,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
...
@@ -324,7 +325,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Cross checking (%s) with %x/%x"
,
active
.
id
,
origin
,
parent
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Cross checking (%s) with %x/%x"
,
active
.
id
,
origin
,
parent
)
d
.
checks
[
origin
]
=
&
crossCheck
{
d
.
checks
[
origin
]
=
&
crossCheck
{
expire
:
time
.
Now
()
.
Add
(
blockTTL
),
expire
:
time
.
Now
()
.
Add
(
block
Soft
TTL
),
parent
:
parent
,
parent
:
parent
,
}
}
active
.
getBlocks
([]
common
.
Hash
{
origin
})
active
.
getBlocks
([]
common
.
Hash
{
origin
})
...
@@ -429,6 +430,7 @@ out:
...
@@ -429,6 +430,7 @@ out:
// Peer did deliver, but some blocks were off, penalize
// Peer did deliver, but some blocks were off, penalize
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Failed delivery for peer %s: %v
\n
"
,
blockPack
.
peerId
,
err
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Failed delivery for peer %s: %v
\n
"
,
blockPack
.
peerId
,
err
)
peer
.
Demote
()
peer
.
Demote
()
peer
.
SetIdle
()
break
break
}
}
if
glog
.
V
(
logger
.
Debug
)
&&
len
(
blockPack
.
blocks
)
>
0
{
if
glog
.
V
(
logger
.
Debug
)
&&
len
(
blockPack
.
blocks
)
>
0
{
...
@@ -444,7 +446,7 @@ out:
...
@@ -444,7 +446,7 @@ out:
// that badly or poorly behave are removed from the peer set (not banned).
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
// reused. XXX We could re-introduce peers after X time.
badPeers
:=
d
.
queue
.
Expire
(
blockTTL
)
badPeers
:=
d
.
queue
.
Expire
(
block
Hard
TTL
)
for
_
,
pid
:=
range
badPeers
{
for
_
,
pid
:=
range
badPeers
{
// XXX We could make use of a reputation system here ranking peers
// XXX We could make use of a reputation system here ranking peers
// in their performance
// in their performance
...
@@ -475,7 +477,7 @@ out:
...
@@ -475,7 +477,7 @@ out:
}
}
// Get a possible chunk. If nil is returned no chunk
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
// could be returned due to no hashes available.
request
:=
d
.
queue
.
Reserve
(
peer
,
MaxBlockFetch
)
request
:=
d
.
queue
.
Reserve
(
peer
)
if
request
==
nil
{
if
request
==
nil
{
continue
continue
}
}
...
...
eth/downloader/downloader_test.go
View file @
0cd72369
...
@@ -191,7 +191,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash
...
@@ -191,7 +191,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash
func
TestDownload
(
t
*
testing
.
T
)
{
func
TestDownload
(
t
*
testing
.
T
)
{
minDesiredPeerCount
=
4
minDesiredPeerCount
=
4
blockTTL
=
1
*
time
.
Second
block
Hard
TTL
=
1
*
time
.
Second
targetBlocks
:=
1000
targetBlocks
:=
1000
hashes
:=
createHashes
(
0
,
targetBlocks
)
hashes
:=
createHashes
(
0
,
targetBlocks
)
...
@@ -240,7 +240,7 @@ func TestMissing(t *testing.T) {
...
@@ -240,7 +240,7 @@ func TestMissing(t *testing.T) {
func
TestTaking
(
t
*
testing
.
T
)
{
func
TestTaking
(
t
*
testing
.
T
)
{
minDesiredPeerCount
=
4
minDesiredPeerCount
=
4
blockTTL
=
1
*
time
.
Second
block
Hard
TTL
=
1
*
time
.
Second
targetBlocks
:=
1000
targetBlocks
:=
1000
hashes
:=
createHashes
(
0
,
targetBlocks
)
hashes
:=
createHashes
(
0
,
targetBlocks
)
...
@@ -281,7 +281,7 @@ func TestInactiveDownloader(t *testing.T) {
...
@@ -281,7 +281,7 @@ func TestInactiveDownloader(t *testing.T) {
func
TestCancel
(
t
*
testing
.
T
)
{
func
TestCancel
(
t
*
testing
.
T
)
{
minDesiredPeerCount
=
4
minDesiredPeerCount
=
4
blockTTL
=
1
*
time
.
Second
block
Hard
TTL
=
1
*
time
.
Second
targetBlocks
:=
1000
targetBlocks
:=
1000
hashes
:=
createHashes
(
0
,
targetBlocks
)
hashes
:=
createHashes
(
0
,
targetBlocks
)
...
@@ -307,7 +307,7 @@ func TestCancel(t *testing.T) {
...
@@ -307,7 +307,7 @@ func TestCancel(t *testing.T) {
func
TestThrottling
(
t
*
testing
.
T
)
{
func
TestThrottling
(
t
*
testing
.
T
)
{
minDesiredPeerCount
=
4
minDesiredPeerCount
=
4
blockTTL
=
1
*
time
.
Second
block
Hard
TTL
=
1
*
time
.
Second
targetBlocks
:=
16
*
blockCacheLimit
targetBlocks
:=
16
*
blockCacheLimit
hashes
:=
createHashes
(
0
,
targetBlocks
)
hashes
:=
createHashes
(
0
,
targetBlocks
)
...
@@ -461,7 +461,7 @@ func TestInvalidHashOrderAttack(t *testing.T) {
...
@@ -461,7 +461,7 @@ func TestInvalidHashOrderAttack(t *testing.T) {
// Tests that if a malicious peer makes up a random hash chain and tries to push
// Tests that if a malicious peer makes up a random hash chain and tries to push
// indefinitely, it actually gets caught with it.
// indefinitely, it actually gets caught with it.
func
TestMadeupHashChainAttack
(
t
*
testing
.
T
)
{
func
TestMadeupHashChainAttack
(
t
*
testing
.
T
)
{
blockTTL
=
100
*
time
.
Millisecond
block
Soft
TTL
=
100
*
time
.
Millisecond
crossCheckCycle
=
25
*
time
.
Millisecond
crossCheckCycle
=
25
*
time
.
Millisecond
// Create a long chain of hashes without backing blocks
// Create a long chain of hashes without backing blocks
...
@@ -495,10 +495,10 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) {
...
@@ -495,10 +495,10 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) {
// Tests that if a malicious peer makes up a random block chain, and tried to
// Tests that if a malicious peer makes up a random block chain, and tried to
// push indefinitely, it actually gets caught with it.
// push indefinitely, it actually gets caught with it.
func
TestMadeupBlockChainAttack
(
t
*
testing
.
T
)
{
func
TestMadeupBlockChainAttack
(
t
*
testing
.
T
)
{
defaultBlockTTL
:=
blockTTL
defaultBlockTTL
:=
block
Soft
TTL
defaultCrossCheckCycle
:=
crossCheckCycle
defaultCrossCheckCycle
:=
crossCheckCycle
blockTTL
=
100
*
time
.
Millisecond
block
Soft
TTL
=
100
*
time
.
Millisecond
crossCheckCycle
=
25
*
time
.
Millisecond
crossCheckCycle
=
25
*
time
.
Millisecond
// Create a long chain of blocks and simulate an invalid chain by dropping every second
// Create a long chain of blocks and simulate an invalid chain by dropping every second
...
@@ -516,7 +516,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
...
@@ -516,7 +516,7 @@ func TestMadeupBlockChainAttack(t *testing.T) {
t
.
Fatalf
(
"synchronisation error mismatch: have %v, want %v"
,
err
,
ErrCrossCheckFailed
)
t
.
Fatalf
(
"synchronisation error mismatch: have %v, want %v"
,
err
,
ErrCrossCheckFailed
)
}
}
// Ensure that a valid chain can still pass sync
// Ensure that a valid chain can still pass sync
blockTTL
=
defaultBlockTTL
block
Soft
TTL
=
defaultBlockTTL
crossCheckCycle
=
defaultCrossCheckCycle
crossCheckCycle
=
defaultCrossCheckCycle
tester
.
hashes
=
hashes
tester
.
hashes
=
hashes
...
@@ -530,10 +530,10 @@ func TestMadeupBlockChainAttack(t *testing.T) {
...
@@ -530,10 +530,10 @@ func TestMadeupBlockChainAttack(t *testing.T) {
// attacker make up a valid hashes for random blocks, but also forges the block
// attacker make up a valid hashes for random blocks, but also forges the block
// parents to point to existing hashes.
// parents to point to existing hashes.
func
TestMadeupParentBlockChainAttack
(
t
*
testing
.
T
)
{
func
TestMadeupParentBlockChainAttack
(
t
*
testing
.
T
)
{
defaultBlockTTL
:=
blockTTL
defaultBlockTTL
:=
block
Soft
TTL
defaultCrossCheckCycle
:=
crossCheckCycle
defaultCrossCheckCycle
:=
crossCheckCycle
blockTTL
=
100
*
time
.
Millisecond
block
Soft
TTL
=
100
*
time
.
Millisecond
crossCheckCycle
=
25
*
time
.
Millisecond
crossCheckCycle
=
25
*
time
.
Millisecond
// Create a long chain of blocks and simulate an invalid chain by dropping every second
// Create a long chain of blocks and simulate an invalid chain by dropping every second
...
@@ -550,7 +550,7 @@ func TestMadeupParentBlockChainAttack(t *testing.T) {
...
@@ -550,7 +550,7 @@ func TestMadeupParentBlockChainAttack(t *testing.T) {
t
.
Fatalf
(
"synchronisation error mismatch: have %v, want %v"
,
err
,
ErrCrossCheckFailed
)
t
.
Fatalf
(
"synchronisation error mismatch: have %v, want %v"
,
err
,
ErrCrossCheckFailed
)
}
}
// Ensure that a valid chain can still pass sync
// Ensure that a valid chain can still pass sync
blockTTL
=
defaultBlockTTL
block
Soft
TTL
=
defaultBlockTTL
crossCheckCycle
=
defaultCrossCheckCycle
crossCheckCycle
=
defaultCrossCheckCycle
tester
.
blocks
=
blocks
tester
.
blocks
=
blocks
...
...
eth/downloader/peer.go
View file @
0cd72369
...
@@ -5,10 +5,14 @@ package downloader
...
@@ -5,10 +5,14 @@ package downloader
import
(
import
(
"errors"
"errors"
"math"
"sync"
"sync"
"sync/atomic"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"gopkg.in/fatih/set.v0"
"gopkg.in/fatih/set.v0"
)
)
...
@@ -27,14 +31,15 @@ type peer struct {
...
@@ -27,14 +31,15 @@ type peer struct {
head
common
.
Hash
// Hash of the peers latest known block
head
common
.
Hash
// Hash of the peers latest known block
idle
int32
// Current activity state of the peer (idle = 0, active = 1)
idle
int32
// Current activity state of the peer (idle = 0, active = 1)
rep
int32
// Simple peer reputation
(not used currently)
rep
int32
// Simple peer reputation
mu
sync
.
RWMutex
capacity
int32
// Number of blocks allowed to fetch per request
started
time
.
Time
// Time instance when the last fetch was started
ignored
*
set
.
Set
ignored
*
set
.
Set
// Set of hashes not to request (didn't have previously)
getHashes
hashFetcherFn
getHashes
hashFetcherFn
// Method to retrieve a batch of hashes (mockable for testing)
getBlocks
blockFetcherFn
getBlocks
blockFetcherFn
// Method to retrieve a batch of blocks (mockable for testing)
}
}
// newPeer create a new downloader peer, with specific hash and block retrieval
// newPeer create a new downloader peer, with specific hash and block retrieval
...
@@ -43,6 +48,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
...
@@ -43,6 +48,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
return
&
peer
{
return
&
peer
{
id
:
id
,
id
:
id
,
head
:
head
,
head
:
head
,
capacity
:
1
,
getHashes
:
getHashes
,
getHashes
:
getHashes
,
getBlocks
:
getBlocks
,
getBlocks
:
getBlocks
,
ignored
:
set
.
New
(),
ignored
:
set
.
New
(),
...
@@ -52,6 +58,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
...
@@ -52,6 +58,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo
// Reset clears the internal state of a peer entity.
// Reset clears the internal state of a peer entity.
func
(
p
*
peer
)
Reset
()
{
func
(
p
*
peer
)
Reset
()
{
atomic
.
StoreInt32
(
&
p
.
idle
,
0
)
atomic
.
StoreInt32
(
&
p
.
idle
,
0
)
atomic
.
StoreInt32
(
&
p
.
capacity
,
1
)
p
.
ignored
.
Clear
()
p
.
ignored
.
Clear
()
}
}
...
@@ -61,6 +68,8 @@ func (p *peer) Fetch(request *fetchRequest) error {
...
@@ -61,6 +68,8 @@ func (p *peer) Fetch(request *fetchRequest) error {
if
!
atomic
.
CompareAndSwapInt32
(
&
p
.
idle
,
0
,
1
)
{
if
!
atomic
.
CompareAndSwapInt32
(
&
p
.
idle
,
0
,
1
)
{
return
errAlreadyFetching
return
errAlreadyFetching
}
}
p
.
started
=
time
.
Now
()
// Convert the hash set to a retrievable slice
// Convert the hash set to a retrievable slice
hashes
:=
make
([]
common
.
Hash
,
0
,
len
(
request
.
Hashes
))
hashes
:=
make
([]
common
.
Hash
,
0
,
len
(
request
.
Hashes
))
for
hash
,
_
:=
range
request
.
Hashes
{
for
hash
,
_
:=
range
request
.
Hashes
{
...
@@ -72,10 +81,41 @@ func (p *peer) Fetch(request *fetchRequest) error {
...
@@ -72,10 +81,41 @@ func (p *peer) Fetch(request *fetchRequest) error {
}
}
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func
(
p
*
peer
)
SetIdle
()
{
func
(
p
*
peer
)
SetIdle
()
{
// Update the peer's download allowance based on previous performance
scale
:=
2.0
if
time
.
Since
(
p
.
started
)
>
blockSoftTTL
{
scale
=
0.5
}
for
{
// Calculate the new download bandwidth allowance
prev
:=
atomic
.
LoadInt32
(
&
p
.
capacity
)
next
:=
int32
(
math
.
Max
(
1
,
math
.
Min
(
MaxBlockFetch
,
float64
(
prev
)
*
scale
)))
// Try to update the old value
if
atomic
.
CompareAndSwapInt32
(
&
p
.
capacity
,
prev
,
next
)
{
// If we're having problems at 1 capacity, try to find better peers
if
next
==
1
{
p
.
Demote
()
}
if
prev
!=
next
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: changing block download capacity from %d to %d"
,
p
.
id
,
prev
,
next
)
}
break
}
}
// Set the peer to idle to allow further block requests
atomic
.
StoreInt32
(
&
p
.
idle
,
0
)
atomic
.
StoreInt32
(
&
p
.
idle
,
0
)
}
}
// Capacity retrieves the peers block download allowance based on its previously
// discovered bandwidth capacity.
func
(
p
*
peer
)
Capacity
()
int
{
return
int
(
atomic
.
LoadInt32
(
&
p
.
capacity
))
}
// Promote increases the peer's reputation.
// Promote increases the peer's reputation.
func
(
p
*
peer
)
Promote
()
{
func
(
p
*
peer
)
Promote
()
{
atomic
.
AddInt32
(
&
p
.
rep
,
1
)
atomic
.
AddInt32
(
&
p
.
rep
,
1
)
...
...
eth/downloader/queue.go
View file @
0cd72369
...
@@ -203,7 +203,7 @@ func (q *queue) TakeBlocks() []*Block {
...
@@ -203,7 +203,7 @@ func (q *queue) TakeBlocks() []*Block {
// Reserve reserves a set of hashes for the given peer, skipping any previously
// Reserve reserves a set of hashes for the given peer, skipping any previously
// failed download.
// failed download.
func
(
q
*
queue
)
Reserve
(
p
*
peer
,
max
int
)
*
fetchRequest
{
func
(
q
*
queue
)
Reserve
(
p
*
peer
)
*
fetchRequest
{
q
.
lock
.
Lock
()
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
defer
q
.
lock
.
Unlock
()
...
@@ -215,11 +215,17 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
...
@@ -215,11 +215,17 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
if
_
,
ok
:=
q
.
pendPool
[
p
.
id
];
ok
{
if
_
,
ok
:=
q
.
pendPool
[
p
.
id
];
ok
{
return
nil
return
nil
}
}
// Calculate an upper limit on the hashes we might fetch (i.e. throttling)
space
:=
len
(
q
.
blockCache
)
-
len
(
q
.
blockPool
)
for
_
,
request
:=
range
q
.
pendPool
{
space
-=
len
(
request
.
Hashes
)
}
// Retrieve a batch of hashes, skipping previously failed ones
// Retrieve a batch of hashes, skipping previously failed ones
send
:=
make
(
map
[
common
.
Hash
]
int
)
send
:=
make
(
map
[
common
.
Hash
]
int
)
skip
:=
make
(
map
[
common
.
Hash
]
int
)
skip
:=
make
(
map
[
common
.
Hash
]
int
)
for
len
(
send
)
<
max
&&
!
q
.
hashQueue
.
Empty
()
{
capacity
:=
p
.
Capacity
()
for
len
(
send
)
<
space
&&
len
(
send
)
<
capacity
&&
!
q
.
hashQueue
.
Empty
()
{
hash
,
priority
:=
q
.
hashQueue
.
Pop
()
hash
,
priority
:=
q
.
hashQueue
.
Pop
()
if
p
.
ignored
.
Has
(
hash
)
{
if
p
.
ignored
.
Has
(
hash
)
{
skip
[
hash
.
(
common
.
Hash
)]
=
int
(
priority
)
skip
[
hash
.
(
common
.
Hash
)]
=
int
(
priority
)
...
...
eth/downloader/queue_test.go
View file @
0cd72369
package
downloader
package
downloader
import
(
import
(
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types"
"gopkg.in/fatih/set.v0"
"gopkg.in/fatih/set.v0"
...
@@ -30,32 +28,3 @@ func createBlocksFromHashSet(hashes *set.Set) []*types.Block {
...
@@ -30,32 +28,3 @@ func createBlocksFromHashSet(hashes *set.Set) []*types.Block {
return
blocks
return
blocks
}
}
func
TestChunking
(
t
*
testing
.
T
)
{
queue
:=
newQueue
()
peer1
:=
newPeer
(
"peer1"
,
common
.
Hash
{},
nil
,
nil
)
peer2
:=
newPeer
(
"peer2"
,
common
.
Hash
{},
nil
,
nil
)
// 99 + 1 (1 == known genesis hash)
hashes
:=
createHashes
(
0
,
99
)
queue
.
Insert
(
hashes
)
chunk1
:=
queue
.
Reserve
(
peer1
,
99
)
if
chunk1
==
nil
{
t
.
Errorf
(
"chunk1 is nil"
)
t
.
FailNow
()
}
chunk2
:=
queue
.
Reserve
(
peer2
,
99
)
if
chunk2
==
nil
{
t
.
Errorf
(
"chunk2 is nil"
)
t
.
FailNow
()
}
if
len
(
chunk1
.
Hashes
)
!=
99
{
t
.
Error
(
"expected chunk1 hashes to be 99, got"
,
len
(
chunk1
.
Hashes
))
}
if
len
(
chunk2
.
Hashes
)
!=
1
{
t
.
Error
(
"expected chunk1 hashes to be 1, got"
,
len
(
chunk2
.
Hashes
))
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment