Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
1989d149
Commit
1989d149
authored
Jun 22, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth/fetcher: handle and (crude) test block memory DOS
parent
d36c25bc
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
105 additions
and
23 deletions
+105
-23
fetcher.go
eth/fetcher/fetcher.go
+45
-16
fetcher_test.go
eth/fetcher/fetcher_test.go
+60
-7
No files found.
eth/fetcher/fetcher.go
View file @
1989d149
...
@@ -20,7 +20,8 @@ const (
...
@@ -20,7 +20,8 @@ const (
fetchTimeout
=
5
*
time
.
Second
// Maximum alloted time to return an explicitly requested block
fetchTimeout
=
5
*
time
.
Second
// Maximum alloted time to return an explicitly requested block
maxUncleDist
=
7
// Maximum allowed backward distance from the chain head
maxUncleDist
=
7
// Maximum allowed backward distance from the chain head
maxQueueDist
=
32
// Maximum allowed distance from the chain head to queue
maxQueueDist
=
32
// Maximum allowed distance from the chain head to queue
announceLimit
=
256
// Maximum number of unique blocks a peer may have announced
hashLimit
=
256
// Maximum number of unique blocks a peer may have announced
blockLimit
=
64
// Maximum number of unique blocks a per may have delivered
)
)
var
(
var
(
...
@@ -80,8 +81,9 @@ type Fetcher struct {
...
@@ -80,8 +81,9 @@ type Fetcher struct {
fetching
map
[
common
.
Hash
]
*
announce
// Announced blocks, currently fetching
fetching
map
[
common
.
Hash
]
*
announce
// Announced blocks, currently fetching
// Block cache
// Block cache
queue
*
prque
.
Prque
// Queue containing the import operations (block number sorted)
queue
*
prque
.
Prque
// Queue containing the import operations (block number sorted)
queued
map
[
common
.
Hash
]
struct
{}
// Presence set of already queued blocks (to dedup imports)
queues
map
[
string
]
int
// Per peer block counts to prevent memory exhaustion
queued
map
[
common
.
Hash
]
*
inject
// Set of already queued blocks (to dedup imports)
// Callbacks
// Callbacks
getBlock
blockRetrievalFn
// Retrieves a block from the local chain
getBlock
blockRetrievalFn
// Retrieves a block from the local chain
...
@@ -104,7 +106,8 @@ func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlo
...
@@ -104,7 +106,8 @@ func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlo
announced
:
make
(
map
[
common
.
Hash
][]
*
announce
),
announced
:
make
(
map
[
common
.
Hash
][]
*
announce
),
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
queue
:
prque
.
New
(),
queue
:
prque
.
New
(),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
queues
:
make
(
map
[
string
]
int
),
queued
:
make
(
map
[
common
.
Hash
]
*
inject
),
getBlock
:
getBlock
,
getBlock
:
getBlock
,
validateBlock
:
validateBlock
,
validateBlock
:
validateBlock
,
broadcastBlock
:
broadcastBlock
,
broadcastBlock
:
broadcastBlock
,
...
@@ -192,22 +195,24 @@ func (f *Fetcher) loop() {
...
@@ -192,22 +195,24 @@ func (f *Fetcher) loop() {
// Clean up any expired block fetches
// Clean up any expired block fetches
for
hash
,
announce
:=
range
f
.
fetching
{
for
hash
,
announce
:=
range
f
.
fetching
{
if
time
.
Since
(
announce
.
time
)
>
fetchTimeout
{
if
time
.
Since
(
announce
.
time
)
>
fetchTimeout
{
f
.
forget
Block
(
hash
)
f
.
forget
Hash
(
hash
)
}
}
}
}
// Import any queued blocks that could potentially fit
// Import any queued blocks that could potentially fit
height
:=
f
.
chainHeight
()
height
:=
f
.
chainHeight
()
for
!
f
.
queue
.
Empty
()
{
for
!
f
.
queue
.
Empty
()
{
op
:=
f
.
queue
.
PopItem
()
.
(
*
inject
)
op
:=
f
.
queue
.
PopItem
()
.
(
*
inject
)
number
:=
op
.
block
.
NumberU64
()
// If too high up the chain or phase, continue later
// If too high up the chain or phase, continue later
number
:=
op
.
block
.
NumberU64
()
if
number
>
height
+
1
{
if
number
>
height
+
1
{
f
.
queue
.
Push
(
op
,
-
float32
(
op
.
block
.
NumberU64
()))
f
.
queue
.
Push
(
op
,
-
float32
(
op
.
block
.
NumberU64
()))
break
break
}
}
// Otherwise if fresh and still unknown, try and import
// Otherwise if fresh and still unknown, try and import
if
number
+
maxUncleDist
<
height
||
f
.
getBlock
(
op
.
block
.
Hash
())
!=
nil
{
hash
:=
op
.
block
.
Hash
()
if
number
+
maxUncleDist
<
height
||
f
.
getBlock
(
hash
)
!=
nil
{
f
.
forgetBlock
(
hash
)
continue
continue
}
}
f
.
insert
(
op
.
origin
,
op
.
block
)
f
.
insert
(
op
.
origin
,
op
.
block
)
...
@@ -221,8 +226,8 @@ func (f *Fetcher) loop() {
...
@@ -221,8 +226,8 @@ func (f *Fetcher) loop() {
case
notification
:=
<-
f
.
notify
:
case
notification
:=
<-
f
.
notify
:
// A block was announced, make sure the peer isn't DOSing us
// A block was announced, make sure the peer isn't DOSing us
count
:=
f
.
announces
[
notification
.
origin
]
+
1
count
:=
f
.
announces
[
notification
.
origin
]
+
1
if
count
>
announce
Limit
{
if
count
>
hash
Limit
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: exceeded outstanding announces (%d)"
,
notification
.
origin
,
announce
Limit
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: exceeded outstanding announces (%d)"
,
notification
.
origin
,
hash
Limit
)
break
break
}
}
// All is well, schedule the announce if block's not yet downloading
// All is well, schedule the announce if block's not yet downloading
...
@@ -241,8 +246,8 @@ func (f *Fetcher) loop() {
...
@@ -241,8 +246,8 @@ func (f *Fetcher) loop() {
case
hash
:=
<-
f
.
done
:
case
hash
:=
<-
f
.
done
:
// A pending import finished, remove all traces of the notification
// A pending import finished, remove all traces of the notification
f
.
forgetHash
(
hash
)
f
.
forgetBlock
(
hash
)
f
.
forgetBlock
(
hash
)
delete
(
f
.
queued
,
hash
)
case
<-
fetch
.
C
:
case
<-
fetch
.
C
:
// At least one block's timer ran out, check for needing retrieval
// At least one block's timer ran out, check for needing retrieval
...
@@ -252,7 +257,7 @@ func (f *Fetcher) loop() {
...
@@ -252,7 +257,7 @@ func (f *Fetcher) loop() {
if
time
.
Since
(
announces
[
0
]
.
time
)
>
arriveTimeout
-
gatherSlack
{
if
time
.
Since
(
announces
[
0
]
.
time
)
>
arriveTimeout
-
gatherSlack
{
// Pick a random peer to retrieve from, reset all others
// Pick a random peer to retrieve from, reset all others
announce
:=
announces
[
rand
.
Intn
(
len
(
announces
))]
announce
:=
announces
[
rand
.
Intn
(
len
(
announces
))]
f
.
forget
Block
(
hash
)
f
.
forget
Hash
(
hash
)
// If the block still didn't arrive, queue for fetching
// If the block still didn't arrive, queue for fetching
if
f
.
getBlock
(
hash
)
==
nil
{
if
f
.
getBlock
(
hash
)
==
nil
{
...
@@ -296,7 +301,7 @@ func (f *Fetcher) loop() {
...
@@ -296,7 +301,7 @@ func (f *Fetcher) loop() {
if
f
.
getBlock
(
hash
)
==
nil
{
if
f
.
getBlock
(
hash
)
==
nil
{
explicit
=
append
(
explicit
,
block
)
explicit
=
append
(
explicit
,
block
)
}
else
{
}
else
{
f
.
forget
Block
(
hash
)
f
.
forget
Hash
(
hash
)
}
}
}
else
{
}
else
{
download
=
append
(
download
,
block
)
download
=
append
(
download
,
block
)
...
@@ -339,6 +344,12 @@ func (f *Fetcher) reschedule(fetch *time.Timer) {
...
@@ -339,6 +344,12 @@ func (f *Fetcher) reschedule(fetch *time.Timer) {
func
(
f
*
Fetcher
)
enqueue
(
peer
string
,
block
*
types
.
Block
)
{
func
(
f
*
Fetcher
)
enqueue
(
peer
string
,
block
*
types
.
Block
)
{
hash
:=
block
.
Hash
()
hash
:=
block
.
Hash
()
// Ensure the peer isn't DOSing us
count
:=
f
.
queues
[
peer
]
+
1
if
count
>
blockLimit
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: discarded block #%d [%x], exceeded allowance (%d)"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
blockLimit
)
return
}
// Discard any past or too distant blocks
// Discard any past or too distant blocks
if
dist
:=
int64
(
block
.
NumberU64
())
-
int64
(
f
.
chainHeight
());
dist
<
-
maxUncleDist
||
dist
>
maxQueueDist
{
if
dist
:=
int64
(
block
.
NumberU64
())
-
int64
(
f
.
chainHeight
());
dist
<
-
maxUncleDist
||
dist
>
maxQueueDist
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: discarded block #%d [%x], distance %d"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
dist
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: discarded block #%d [%x], distance %d"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
dist
)
...
@@ -346,8 +357,13 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
...
@@ -346,8 +357,13 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
}
}
// Schedule the block for future importing
// Schedule the block for future importing
if
_
,
ok
:=
f
.
queued
[
hash
];
!
ok
{
if
_
,
ok
:=
f
.
queued
[
hash
];
!
ok
{
f
.
queued
[
hash
]
=
struct
{}{}
op
:=
&
inject
{
f
.
queue
.
Push
(
&
inject
{
origin
:
peer
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
origin
:
peer
,
block
:
block
,
}
f
.
queues
[
peer
]
=
count
f
.
queued
[
hash
]
=
op
f
.
queue
.
Push
(
op
,
-
float32
(
block
.
NumberU64
()))
if
glog
.
V
(
logger
.
Debug
)
{
if
glog
.
V
(
logger
.
Debug
)
{
glog
.
Infof
(
"Peer %s: queued block #%d [%x], total %v"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
f
.
queue
.
Size
())
glog
.
Infof
(
"Peer %s: queued block #%d [%x], total %v"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
f
.
queue
.
Size
())
...
@@ -389,8 +405,9 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
...
@@ -389,8 +405,9 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
}()
}()
}
}
// forgetBlock removes all traces of a block from the fetcher's internal state.
// forgetHash removes all traces of a block announcement from the fetcher's
func
(
f
*
Fetcher
)
forgetBlock
(
hash
common
.
Hash
)
{
// internal state.
func
(
f
*
Fetcher
)
forgetHash
(
hash
common
.
Hash
)
{
// Remove all pending announces and decrement DOS counters
// Remove all pending announces and decrement DOS counters
for
_
,
announce
:=
range
f
.
announced
[
hash
]
{
for
_
,
announce
:=
range
f
.
announced
[
hash
]
{
f
.
announces
[
announce
.
origin
]
--
f
.
announces
[
announce
.
origin
]
--
...
@@ -409,3 +426,15 @@ func (f *Fetcher) forgetBlock(hash common.Hash) {
...
@@ -409,3 +426,15 @@ func (f *Fetcher) forgetBlock(hash common.Hash) {
delete
(
f
.
fetching
,
hash
)
delete
(
f
.
fetching
,
hash
)
}
}
}
}
// forgetBlock removes all traces of a queued block frmo the fetcher's internal
// state.
func
(
f
*
Fetcher
)
forgetBlock
(
hash
common
.
Hash
)
{
if
insert
:=
f
.
queued
[
hash
];
insert
!=
nil
{
f
.
queues
[
insert
.
origin
]
--
if
f
.
queues
[
insert
.
origin
]
==
0
{
delete
(
f
.
queues
,
insert
.
origin
)
}
delete
(
f
.
queued
,
hash
)
}
}
eth/fetcher/fetcher_test.go
View file @
1989d149
...
@@ -399,15 +399,15 @@ func TestDistantDiscarding(t *testing.T) {
...
@@ -399,15 +399,15 @@ func TestDistantDiscarding(t *testing.T) {
// Tests that a peer is unable to use unbounded memory with sending infinite
// Tests that a peer is unable to use unbounded memory with sending infinite
// block announcements to a node, but that even in the face of such an attack,
// block announcements to a node, but that even in the face of such an attack,
// the fetcher remains operational.
// the fetcher remains operational.
func
Test
Announce
MemoryExhaustionAttack
(
t
*
testing
.
T
)
{
func
Test
Hash
MemoryExhaustionAttack
(
t
*
testing
.
T
)
{
tester
:=
newTester
()
tester
:=
newTester
()
// Create a valid chain and an infinite junk chain
// Create a valid chain and an infinite junk chain
hashes
:=
createHashes
(
announce
Limit
+
2
*
maxQueueDist
,
knownHash
)
hashes
:=
createHashes
(
hash
Limit
+
2
*
maxQueueDist
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
blocks
:=
createBlocksFromHashes
(
hashes
)
valid
:=
tester
.
makeFetcher
(
blocks
)
valid
:=
tester
.
makeFetcher
(
blocks
)
attack
:=
createHashes
(
announce
Limit
+
2
*
maxQueueDist
,
unknownHash
)
attack
:=
createHashes
(
hash
Limit
+
2
*
maxQueueDist
,
unknownHash
)
attacker
:=
tester
.
makeFetcher
(
nil
)
attacker
:=
tester
.
makeFetcher
(
nil
)
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
...
@@ -417,8 +417,8 @@ func TestAnnounceMemoryExhaustionAttack(t *testing.T) {
...
@@ -417,8 +417,8 @@ func TestAnnounceMemoryExhaustionAttack(t *testing.T) {
}
}
tester
.
fetcher
.
Notify
(
"attacker"
,
attack
[
i
],
time
.
Now
()
.
Add
(
arriveTimeout
/
2
),
attacker
)
tester
.
fetcher
.
Notify
(
"attacker"
,
attack
[
i
],
time
.
Now
()
.
Add
(
arriveTimeout
/
2
),
attacker
)
}
}
if
len
(
tester
.
fetcher
.
announced
)
!=
announce
Limit
+
maxQueueDist
{
if
len
(
tester
.
fetcher
.
announced
)
!=
hash
Limit
+
maxQueueDist
{
t
.
Fatalf
(
"queued announce count mismatch: have %d, want %d"
,
len
(
tester
.
fetcher
.
announced
),
announce
Limit
+
maxQueueDist
)
t
.
Fatalf
(
"queued announce count mismatch: have %d, want %d"
,
len
(
tester
.
fetcher
.
announced
),
hash
Limit
+
maxQueueDist
)
}
}
// Wait for synchronisation to complete and check success for the valid peer
// Wait for synchronisation to complete and check success for the valid peer
time
.
Sleep
(
2
*
arriveTimeout
)
time
.
Sleep
(
2
*
arriveTimeout
)
...
@@ -431,10 +431,63 @@ func TestAnnounceMemoryExhaustionAttack(t *testing.T) {
...
@@ -431,10 +431,63 @@ func TestAnnounceMemoryExhaustionAttack(t *testing.T) {
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
time
.
Millisecond
),
valid
)
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
i
],
time
.
Now
()
.
Add
(
time
.
Millisecond
),
valid
)
i
--
i
--
}
}
time
.
Sleep
(
256
*
time
.
Millisecond
)
time
.
Sleep
(
500
*
time
.
Millisecond
)
}
}
time
.
Sleep
(
256
*
time
.
Millisecond
)
time
.
Sleep
(
500
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
len
(
hashes
)
{
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
len
(
hashes
)
{
t
.
Fatalf
(
"fully synchronised block mismatch: have %v, want %v"
,
imported
,
len
(
hashes
))
t
.
Fatalf
(
"fully synchronised block mismatch: have %v, want %v"
,
imported
,
len
(
hashes
))
}
}
}
}
// Tests that blocks sent to the fetcher (either through propagation or via hash
// announces and retrievals) don't pile up indefinitely, exhausting available
// system memory.
func
TestBlockMemoryExhaustionAttack
(
t
*
testing
.
T
)
{
tester
:=
newTester
()
// Create a valid chain and a batch of dangling (but in range) blocks
hashes
:=
createHashes
(
blockLimit
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
attack
:=
make
(
map
[
common
.
Hash
]
*
types
.
Block
)
for
i
:=
0
;
i
<
16
;
i
++
{
hashes
:=
createHashes
(
maxQueueDist
-
1
,
unknownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
for
_
,
hash
:=
range
hashes
[
:
maxQueueDist
-
2
]
{
attack
[
hash
]
=
blocks
[
hash
]
}
}
// Try to feed all the attacker blocks make sure only a limited batch is accepted
for
_
,
block
:=
range
attack
{
tester
.
fetcher
.
Enqueue
(
"attacker"
,
block
)
}
time
.
Sleep
(
100
*
time
.
Millisecond
)
if
queued
:=
tester
.
fetcher
.
queue
.
Size
();
queued
!=
blockLimit
{
t
.
Fatalf
(
"queued block count mismatch: have %d, want %d"
,
queued
,
blockLimit
)
}
// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
for
i
:=
0
;
i
<
maxQueueDist
-
1
;
i
++
{
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
len
(
hashes
)
-
3
-
i
]])
}
time
.
Sleep
(
100
*
time
.
Millisecond
)
if
queued
:=
tester
.
fetcher
.
queue
.
Size
();
queued
!=
blockLimit
+
maxQueueDist
-
1
{
t
.
Fatalf
(
"queued block count mismatch: have %d, want %d"
,
queued
,
blockLimit
+
maxQueueDist
-
1
)
}
// Insert the missing piece (and sanity check the import)
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
len
(
hashes
)
-
2
]])
time
.
Sleep
(
500
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
maxQueueDist
+
1
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
maxQueueDist
+
1
)
}
// Insert the remaining blocks in chunks to ensure clean DOS protection
for
i
:=
maxQueueDist
;
i
<
len
(
hashes
)
-
1
;
i
++
{
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
len
(
hashes
)
-
2
-
i
]])
if
i
%
maxQueueDist
==
0
{
time
.
Sleep
(
500
*
time
.
Millisecond
)
}
}
time
.
Sleep
(
500
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
blocks
);
imported
!=
len
(
hashes
)
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
len
(
hashes
))
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment