Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
b53f701c
Commit
b53f701c
authored
Jun 22, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth/fetcher: remove test sleeps (15s -> 2.8s)
parent
1989d149
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
167 additions
and
59 deletions
+167
-59
fetcher.go
eth/fetcher/fetcher.go
+16
-1
fetcher_test.go
eth/fetcher/fetcher_test.go
+151
-58
No files found.
eth/fetcher/fetcher.go
View file @
b53f701c
...
...
@@ -92,6 +92,10 @@ type Fetcher struct {
chainHeight
chainHeightFn
// Retrieves the current chain's height
insertChain
chainInsertFn
// Injects a batch of blocks into the chain
dropPeer
peerDropFn
// Drops a peer for misbehaving
// Testing hooks
fetchingHook
func
([]
common
.
Hash
)
// Method to call upon starting a block fetch
importedHook
func
(
*
types
.
Block
)
// Method to call upon successful block import
}
// New creates a block fetcher to retrieve blocks based on hash announcements.
...
...
@@ -277,7 +281,13 @@ func (f *Fetcher) loop() {
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: fetching %s"
,
peer
,
list
)
}
go
f
.
fetching
[
hashes
[
0
]]
.
fetch
(
hashes
)
hashes
:=
hashes
// closure!
go
func
()
{
if
f
.
fetchingHook
!=
nil
{
f
.
fetchingHook
(
hashes
)
}
f
.
fetching
[
hashes
[
0
]]
.
fetch
(
hashes
)
}()
}
// Schedule the next fetch if blocks are still pending
f
.
reschedule
(
fetch
)
...
...
@@ -402,6 +412,11 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
}
// If import succeeded, broadcast the block
go
f
.
broadcastBlock
(
block
,
false
)
// Invoke the testing hook if needed
if
f
.
importedHook
!=
nil
{
f
.
importedHook
(
block
)
}
}()
}
...
...
eth/fetcher/fetcher_test.go
View file @
b53f701c
...
...
@@ -163,7 +163,7 @@ func (f *fetcherTester) makeFetcher(blocks map[common.Hash]*types.Block) blockRe
// them, successfully importing into the local chain.
func
TestSequentialAnnouncements
(
t
*
testing
.
T
)
{
// Create a chain of blocks to import
targetBlocks
:=
24
targetBlocks
:=
4
*
hashLimit
hashes
:=
createHashes
(
targetBlocks
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
...
...
@@ -171,12 +171,22 @@ func TestSequentialAnnouncements(t *testing.T) {
fetcher
:=
tester
.
makeFetcher
(
blocks
)
// Iteratively announce blocks until all are imported
for
i
:=
len
(
hashes
)
-
1
;
i
>=
0
;
i
--
{
imported
:=
make
(
chan
*
types
.
Block
)
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
for
i
:=
len
(
hashes
)
-
2
;
i
>=
0
;
i
--
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
len
(
hashes
)
-
i
)
}
}
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
targetBlocks
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
targetBlocks
+
1
)
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
}
...
...
@@ -184,7 +194,7 @@ func TestSequentialAnnouncements(t *testing.T) {
// peer), they will only get downloaded at most once.
func
TestConcurrentAnnouncements
(
t
*
testing
.
T
)
{
// Create a chain of blocks to import
targetBlocks
:=
24
targetBlocks
:=
4
*
hashLimit
hashes
:=
createHashes
(
targetBlocks
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
...
...
@@ -198,15 +208,24 @@ func TestConcurrentAnnouncements(t *testing.T) {
return
fetcher
(
hashes
)
}
// Iteratively announce blocks until all are imported
for
i
:=
len
(
hashes
)
-
1
;
i
>=
0
;
i
--
{
imported
:=
make
(
chan
*
types
.
Block
)
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
for
i
:=
len
(
hashes
)
-
2
;
i
>=
0
;
i
--
{
tester
.
fetcher
.
Notify
(
"first"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
wrapper
)
tester
.
fetcher
.
Notify
(
"second"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
+
time
.
Millisecond
),
wrapper
)
tester
.
fetcher
.
Notify
(
"second"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
-
time
.
Millisecond
),
wrapper
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
len
(
hashes
)
-
i
)
}
}
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
targetBlocks
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
targetBlocks
+
1
)
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
// Make sure no blocks were retrieved twice
if
int
(
counter
)
!=
targetBlocks
{
...
...
@@ -218,7 +237,7 @@ func TestConcurrentAnnouncements(t *testing.T) {
// results in a valid import.
func
TestOverlappingAnnouncements
(
t
*
testing
.
T
)
{
// Create a chain of blocks to import
targetBlocks
:=
24
targetBlocks
:=
4
*
hashLimit
hashes
:=
createHashes
(
targetBlocks
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
...
...
@@ -226,15 +245,31 @@ func TestOverlappingAnnouncements(t *testing.T) {
fetcher
:=
tester
.
makeFetcher
(
blocks
)
// Iteratively announce blocks, but overlap them continuously
delay
,
overlap
:=
50
*
time
.
Millisecond
,
time
.
Duration
(
5
)
for
i
:=
len
(
hashes
)
-
1
;
i
>=
0
;
i
--
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
+
overlap
*
delay
),
fetcher
)
time
.
Sleep
(
delay
)
}
time
.
Sleep
(
overlap
*
delay
)
fetching
:=
make
(
chan
[]
common
.
Hash
)
imported
:=
make
(
chan
*
types
.
Block
,
len
(
hashes
)
-
1
)
tester
.
fetcher
.
fetchingHook
=
func
(
hashes
[]
common
.
Hash
)
{
fetching
<-
hashes
}
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
targetBlocks
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
targetBlocks
+
1
)
for
i
:=
len
(
hashes
)
-
2
;
i
>=
0
;
i
--
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
select
{
case
<-
fetching
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"hash %d: announce timeout"
,
len
(
hashes
)
-
i
)
}
}
// Wait for all the imports to complete and check count
for
i
:=
0
;
i
<
len
(
hashes
)
-
1
;
i
++
{
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
i
)
}
}
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
}
...
...
@@ -280,27 +315,37 @@ func TestPendingDeduplication(t *testing.T) {
// imported when all the gaps are filled in.
func
TestRandomArrivalImport
(
t
*
testing
.
T
)
{
// Create a chain of blocks to import, and choose one to delay
targetBlocks
:=
24
hashes
:=
createHashes
(
targetBlocks
,
knownHash
)
hashes
:=
createHashes
(
maxQueueDist
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
skip
:=
targetBlocks
/
2
skip
:=
maxQueueDist
/
2
tester
:=
newTester
()
fetcher
:=
tester
.
makeFetcher
(
blocks
)
// Iteratively announce blocks, skipping one entry
imported
:=
make
(
chan
*
types
.
Block
,
len
(
hashes
)
-
1
)
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
for
i
:=
len
(
hashes
)
-
1
;
i
>=
0
;
i
--
{
if
i
!=
skip
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
time
.
Sleep
(
time
.
Millisecond
)
}
}
// Finally announce the skipped entry and check full import
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
skip
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
targetBlocks
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
targetBlocks
+
1
)
for
i
:=
0
;
i
<
len
(
hashes
)
-
1
;
i
++
{
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
i
)
}
}
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
}
...
...
@@ -308,27 +353,37 @@ func TestRandomArrivalImport(t *testing.T) {
// are correctly schedule, filling and import queue gaps.
func
TestQueueGapFill
(
t
*
testing
.
T
)
{
// Create a chain of blocks to import, and choose one to not announce at all
targetBlocks
:=
24
hashes
:=
createHashes
(
targetBlocks
,
knownHash
)
hashes
:=
createHashes
(
maxQueueDist
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
skip
:=
targetBlocks
/
2
skip
:=
maxQueueDist
/
2
tester
:=
newTester
()
fetcher
:=
tester
.
makeFetcher
(
blocks
)
// Iteratively announce blocks, skipping one entry
imported
:=
make
(
chan
*
types
.
Block
,
len
(
hashes
)
-
1
)
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
for
i
:=
len
(
hashes
)
-
1
;
i
>=
0
;
i
--
{
if
i
!=
skip
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
time
.
Sleep
(
time
.
Millisecond
)
}
}
// Fill the missing block directly as if propagated
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
skip
]])
time
.
Sleep
(
50
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
targetBlocks
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
targetBlocks
+
1
)
for
i
:=
0
;
i
<
len
(
hashes
)
-
1
;
i
++
{
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
i
)
}
}
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
}
...
...
@@ -348,9 +403,15 @@ func TestImportDeduplication(t *testing.T) {
atomic
.
AddUint32
(
&
counter
,
uint32
(
len
(
blocks
)))
return
tester
.
insertChain
(
blocks
)
}
// Instrument the fetching and imported events
fetching
:=
make
(
chan
[]
common
.
Hash
)
imported
:=
make
(
chan
*
types
.
Block
,
len
(
hashes
)
-
1
)
tester
.
fetcher
.
fetchingHook
=
func
(
hashes
[]
common
.
Hash
)
{
fetching
<-
hashes
}
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
// Announce the duplicating block, wait for retrieval, and also propagate directly
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
0
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
<-
fetching
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
0
]])
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
0
]])
...
...
@@ -358,8 +419,13 @@ func TestImportDeduplication(t *testing.T) {
// Fill the missing block directly as if propagated, and check import uniqueness
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
1
]])
time
.
Sleep
(
50
*
time
.
Millisecond
)
for
done
:=
false
;
!
done
;
{
select
{
case
<-
imported
:
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
done
=
true
}
}
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
3
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
3
)
}
...
...
@@ -400,8 +466,12 @@ func TestDistantDiscarding(t *testing.T) {
// block announcements to a node, but that even in the face of such an attack,
// the fetcher remains operational.
func
TestHashMemoryExhaustionAttack
(
t
*
testing
.
T
)
{
// Create a tester with instrumented import hooks
tester
:=
newTester
()
imported
:=
make
(
chan
*
types
.
Block
)
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
// Create a valid chain and an infinite junk chain
hashes
:=
createHashes
(
hashLimit
+
2
*
maxQueueDist
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
...
...
@@ -413,29 +483,39 @@ func TestHashMemoryExhaustionAttack(t *testing.T) {
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
for
i
:=
0
;
i
<
len
(
attack
);
i
++
{
if
i
<
maxQueueDist
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
len
(
hashes
)
-
1
-
i
],
time
.
Now
()
.
Add
(
arriveTimeout
/
2
),
valid
)
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
len
(
hashes
)
-
2
-
i
],
time
.
Now
(
),
valid
)
}
tester
.
fetcher
.
Notify
(
"attacker"
,
attack
[
i
],
time
.
Now
()
.
Add
(
arriveTimeout
/
2
)
,
attacker
)
tester
.
fetcher
.
Notify
(
"attacker"
,
attack
[
i
],
time
.
Now
(),
attacker
)
}
if
len
(
tester
.
fetcher
.
announced
)
!=
hashLimit
+
maxQueueDist
{
t
.
Fatalf
(
"queued announce count mismatch: have %d, want %d"
,
len
(
tester
.
fetcher
.
announced
),
hashLimit
+
maxQueueDist
)
}
// Wait for synchronisation to complete and check success for the valid peer
time
.
Sleep
(
2
*
arriveTimeout
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
maxQueueDist
{
t
.
Fatalf
(
"partial synchronised block mismatch: have %v, want %v"
,
imported
,
maxQueueDist
)
// Wait for fetches to complete
for
i
:=
0
;
i
<
maxQueueDist
;
i
++
{
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
i
)
}
}
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
// Feed the remaining valid hashes to ensure DOS protection state remains clean
for
i
:=
len
(
hashes
)
-
maxQueueDist
;
i
>=
0
;
{
for
j
:=
0
;
j
<
maxQueueDist
&&
i
>=
0
;
j
++
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
time
.
Millisecond
),
valid
)
i
--
for
i
:=
len
(
hashes
)
-
maxQueueDist
-
2
;
i
>=
0
;
i
--
{
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
valid
)
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
len
(
hashes
)
-
i
)
}
time
.
Sleep
(
500
*
time
.
Millisecond
)
}
time
.
Sleep
(
500
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
len
(
hashes
)
{
t
.
Fatalf
(
"fully synchronised block mismatch: have %v, want %v"
,
imported
,
len
(
hashes
))
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
}
...
...
@@ -443,14 +523,18 @@ func TestHashMemoryExhaustionAttack(t *testing.T) {
// announces and retrievals) don't pile up indefinitely, exhausting available
// system memory.
func
TestBlockMemoryExhaustionAttack
(
t
*
testing
.
T
)
{
// Create a tester with instrumented import hooks
tester
:=
newTester
()
imported
:=
make
(
chan
*
types
.
Block
)
tester
.
fetcher
.
importedHook
=
func
(
block
*
types
.
Block
)
{
imported
<-
block
}
// Create a valid chain and a batch of dangling (but in range) blocks
hashes
:=
createHashes
(
blockLimit
,
knownHash
)
hashes
:=
createHashes
(
blockLimit
+
2
*
maxQueueDist
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
attack
:=
make
(
map
[
common
.
Hash
]
*
types
.
Block
)
for
i
:=
0
;
i
<
16
;
i
++
{
for
len
(
attack
)
<
blockLimit
+
2
*
maxQueueDist
{
hashes
:=
createHashes
(
maxQueueDist
-
1
,
unknownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
for
_
,
hash
:=
range
hashes
[
:
maxQueueDist
-
2
]
{
...
...
@@ -475,18 +559,27 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
}
// Insert the missing piece (and sanity check the import)
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
len
(
hashes
)
-
2
]])
time
.
Sleep
(
500
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
maxQueueDist
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
maxQueueDist
+
1
)
for
i
:=
0
;
i
<
maxQueueDist
;
i
++
{
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
i
)
}
}
select
{
case
<-
imported
:
t
.
Fatalf
(
"extra block imported"
)
case
<-
time
.
After
(
50
*
time
.
Millisecond
)
:
}
// Insert the remaining blocks in chunks to ensure clean DOS protection
for
i
:=
maxQueueDist
;
i
<
len
(
hashes
)
-
1
;
i
++
{
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
len
(
hashes
)
-
2
-
i
]])
if
i
%
maxQueueDist
==
0
{
time
.
Sleep
(
500
*
time
.
Millisecond
)
select
{
case
<-
imported
:
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatalf
(
"block %d: import timeout"
,
len
(
hashes
)
-
i
)
}
}
time
.
Sleep
(
500
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
len
(
hashes
)
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
len
(
hashes
))
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment