Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
b40dc8a1
Commit
b40dc8a1
authored
Feb 25, 2016
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth/downloader: implement concurrent header downloads
parent
fe532a98
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
584 additions
and
211 deletions
+584
-211
downloader.go
eth/downloader/downloader.go
+299
-181
downloader_test.go
eth/downloader/downloader_test.go
+24
-23
peer.go
eth/downloader/peer.go
+61
-5
queue.go
eth/downloader/queue.go
+200
-2
No files found.
eth/downloader/downloader.go
View file @
b40dc8a1
...
...
@@ -42,6 +42,7 @@ var (
MaxHashFetch
=
512
// Amount of hashes to be fetched per retrieval request
MaxBlockFetch
=
128
// Amount of blocks to be fetched per retrieval request
MaxHeaderFetch
=
192
// Amount of block headers to be fetched per retrieval request
MaxSkeletonSize
=
128
// Number of header fetches to need for a skeleton assembly
MaxBodyFetch
=
128
// Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch
=
256
// Amount of transaction receipts to allow fetching per request
MaxStateFetch
=
384
// Amount of node state values to allow fetching per request
...
...
@@ -52,7 +53,8 @@ var (
blockTargetRTT
=
3
*
time
.
Second
/
2
// [eth/61] Target time for completing a block retrieval request
blockTTL
=
3
*
blockTargetRTT
// [eth/61] Maximum time allowance before a block request is considered expired
headerTTL
=
3
*
time
.
Second
// [eth/62] Time it takes for a header request to time out
headerTargetRTT
=
time
.
Second
// [eth/62] Target time for completing a header retrieval request (only for measurements for now)
headerTTL
=
2
*
time
.
Second
// [eth/62] Time it takes for a header request to time out
bodyTargetRTT
=
3
*
time
.
Second
/
2
// [eth/62] Target time for completing a block body retrieval request
bodyTTL
=
3
*
bodyTargetRTT
// [eth/62] Maximum time allowance before a block body request is considered expired
receiptTargetRTT
=
3
*
time
.
Second
/
2
// [eth/63] Target time for completing a receipt retrieval request
...
...
@@ -60,9 +62,10 @@ var (
stateTargetRTT
=
2
*
time
.
Second
/
2
// [eth/63] Target time for completing a state trie retrieval request
stateTTL
=
3
*
stateTargetRTT
// [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes
=
256
*
1024
// [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders
=
256
*
1024
// [eth/62] Maximum number of headers to queue for import (DOS protection)
maxResultsProcess
=
256
// Number of download results to import at once into the chain
maxQueuedHashes
=
32
*
1024
// [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders
=
32
*
1024
// [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess
=
2048
// Number of header download results to import at once into the chain
maxResultsProcess
=
4096
// Number of content download results to import at once into the chain
fsHeaderCheckFrequency
=
100
// Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet
=
2048
// Number of headers to discard in case a chain violation is detected
...
...
@@ -72,29 +75,30 @@ var (
)
var
(
errBusy
=
errors
.
New
(
"busy"
)
errUnknownPeer
=
errors
.
New
(
"peer is unknown or unhealthy"
)
errBadPeer
=
errors
.
New
(
"action from bad peer ignored"
)
errStallingPeer
=
errors
.
New
(
"peer is stalling"
)
errNoPeers
=
errors
.
New
(
"no peers to keep download active"
)
errTimeout
=
errors
.
New
(
"timeout"
)
errEmptyHashSet
=
errors
.
New
(
"empty hash set by peer"
)
errEmptyHeaderSet
=
errors
.
New
(
"empty header set by peer"
)
errPeersUnavailable
=
errors
.
New
(
"no peers available or all tried for download"
)
errAlreadyInPool
=
errors
.
New
(
"hash already in pool"
)
errInvalidAncestor
=
errors
.
New
(
"retrieved ancestor is invalid"
)
errInvalidChain
=
errors
.
New
(
"retrieved hash chain is invalid"
)
errInvalidBlock
=
errors
.
New
(
"retrieved block is invalid"
)
errInvalidBody
=
errors
.
New
(
"retrieved block body is invalid"
)
errInvalidReceipt
=
errors
.
New
(
"retrieved receipt is invalid"
)
errCancelHashFetch
=
errors
.
New
(
"hash download canceled (requested)"
)
errCancelBlockFetch
=
errors
.
New
(
"block download canceled (requested)"
)
errCancelHeaderFetch
=
errors
.
New
(
"block header download canceled (requested)"
)
errCancelBodyFetch
=
errors
.
New
(
"block body download canceled (requested)"
)
errCancelReceiptFetch
=
errors
.
New
(
"receipt download canceled (requested)"
)
errCancelStateFetch
=
errors
.
New
(
"state data download canceled (requested)"
)
errCancelProcessing
=
errors
.
New
(
"processing canceled (requested)"
)
errNoSyncActive
=
errors
.
New
(
"no sync active"
)
errBusy
=
errors
.
New
(
"busy"
)
errUnknownPeer
=
errors
.
New
(
"peer is unknown or unhealthy"
)
errBadPeer
=
errors
.
New
(
"action from bad peer ignored"
)
errStallingPeer
=
errors
.
New
(
"peer is stalling"
)
errNoPeers
=
errors
.
New
(
"no peers to keep download active"
)
errTimeout
=
errors
.
New
(
"timeout"
)
errEmptyHashSet
=
errors
.
New
(
"empty hash set by peer"
)
errEmptyHeaderSet
=
errors
.
New
(
"empty header set by peer"
)
errPeersUnavailable
=
errors
.
New
(
"no peers available or all tried for download"
)
errAlreadyInPool
=
errors
.
New
(
"hash already in pool"
)
errInvalidAncestor
=
errors
.
New
(
"retrieved ancestor is invalid"
)
errInvalidChain
=
errors
.
New
(
"retrieved hash chain is invalid"
)
errInvalidBlock
=
errors
.
New
(
"retrieved block is invalid"
)
errInvalidBody
=
errors
.
New
(
"retrieved block body is invalid"
)
errInvalidReceipt
=
errors
.
New
(
"retrieved receipt is invalid"
)
errCancelHashFetch
=
errors
.
New
(
"hash download canceled (requested)"
)
errCancelBlockFetch
=
errors
.
New
(
"block download canceled (requested)"
)
errCancelHeaderFetch
=
errors
.
New
(
"block header download canceled (requested)"
)
errCancelBodyFetch
=
errors
.
New
(
"block body download canceled (requested)"
)
errCancelReceiptFetch
=
errors
.
New
(
"receipt download canceled (requested)"
)
errCancelStateFetch
=
errors
.
New
(
"state data download canceled (requested)"
)
errCancelHeaderProcessing
=
errors
.
New
(
"header processing canceled (requested)"
)
errCancelContentProcessing
=
errors
.
New
(
"content processing canceled (requested)"
)
errNoSyncActive
=
errors
.
New
(
"no sync active"
)
)
type
Downloader
struct
{
...
...
@@ -137,16 +141,17 @@ type Downloader struct {
// Channels
newPeerCh
chan
*
peer
hashCh
chan
dataPack
// [eth/61] Channel receiving inbound hashes
blockCh
chan
dataPack
// [eth/61] Channel receiving inbound blocks
headerCh
chan
dataPack
// [eth/62] Channel receiving inbound block headers
bodyCh
chan
dataPack
// [eth/62] Channel receiving inbound block bodies
receiptCh
chan
dataPack
// [eth/63] Channel receiving inbound receipts
stateCh
chan
dataPack
// [eth/63] Channel receiving inbound node state data
blockWakeCh
chan
bool
// [eth/61] Channel to signal the block fetcher of new tasks
bodyWakeCh
chan
bool
// [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh
chan
bool
// [eth/63] Channel to signal the receipt fetcher of new tasks
stateWakeCh
chan
bool
// [eth/63] Channel to signal the state fetcher of new tasks
hashCh
chan
dataPack
// [eth/61] Channel receiving inbound hashes
blockCh
chan
dataPack
// [eth/61] Channel receiving inbound blocks
headerCh
chan
dataPack
// [eth/62] Channel receiving inbound block headers
bodyCh
chan
dataPack
// [eth/62] Channel receiving inbound block bodies
receiptCh
chan
dataPack
// [eth/63] Channel receiving inbound receipts
stateCh
chan
dataPack
// [eth/63] Channel receiving inbound node state data
blockWakeCh
chan
bool
// [eth/61] Channel to signal the block fetcher of new tasks
bodyWakeCh
chan
bool
// [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh
chan
bool
// [eth/63] Channel to signal the receipt fetcher of new tasks
stateWakeCh
chan
bool
// [eth/63] Channel to signal the state fetcher of new tasks
headerProcCh
chan
[]
*
types
.
Header
// [eth/62] Channel to feed the header processor new tasks
cancelCh
chan
struct
{}
// Channel to cancel mid-flight syncs
cancelLock
sync
.
RWMutex
// Lock to protect the cancel channel in delivers
...
...
@@ -194,6 +199,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
bodyWakeCh
:
make
(
chan
bool
,
1
),
receiptWakeCh
:
make
(
chan
bool
,
1
),
stateWakeCh
:
make
(
chan
bool
,
1
),
headerProcCh
:
make
(
chan
[]
*
types
.
Header
,
1
),
}
}
...
...
@@ -308,6 +314,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
default
:
}
}
for
empty
:=
false
;
!
empty
;
{
select
{
case
<-
d
.
headerProcCh
:
default
:
empty
=
true
}
}
// Reset any ephemeral sync statistics
d
.
syncStatsLock
.
Lock
()
d
.
syncStatsStateTotal
=
0
...
...
@@ -373,7 +386,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if
d
.
syncInitHook
!=
nil
{
d
.
syncInitHook
(
origin
,
latest
)
}
return
d
.
spawnSync
(
return
d
.
spawnSync
(
origin
+
1
,
func
()
error
{
return
d
.
fetchHashes61
(
p
,
td
,
origin
+
1
)
},
func
()
error
{
return
d
.
fetchBlocks61
(
origin
+
1
)
},
)
...
...
@@ -423,11 +436,12 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if
d
.
syncInitHook
!=
nil
{
d
.
syncInitHook
(
origin
,
latest
)
}
return
d
.
spawnSync
(
func
()
error
{
return
d
.
fetchHeaders
(
p
,
td
,
origin
+
1
)
},
// Headers are always retrieved
func
()
error
{
return
d
.
fetchBodies
(
origin
+
1
)
},
// Bodies are retrieved during normal and fast sync
func
()
error
{
return
d
.
fetchReceipts
(
origin
+
1
)
},
// Receipts are retrieved during fast sync
func
()
error
{
return
d
.
fetchNodeData
()
},
// Node state data is retrieved during fast sync
return
d
.
spawnSync
(
origin
+
1
,
func
()
error
{
return
d
.
fetchHeaders
(
p
,
origin
+
1
)
},
// Headers are always retrieved
func
()
error
{
return
d
.
processHeaders
(
origin
+
1
,
td
)
},
// Headers are always retrieved
func
()
error
{
return
d
.
fetchBodies
(
origin
+
1
)
},
// Bodies are retrieved during normal and fast sync
func
()
error
{
return
d
.
fetchReceipts
(
origin
+
1
)
},
// Receipts are retrieved during fast sync
func
()
error
{
return
d
.
fetchNodeData
()
},
// Node state data is retrieved during fast sync
)
default
:
...
...
@@ -439,11 +453,11 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func
(
d
*
Downloader
)
spawnSync
(
fetchers
...
func
()
error
)
error
{
func
(
d
*
Downloader
)
spawnSync
(
origin
uint64
,
fetchers
...
func
()
error
)
error
{
var
wg
sync
.
WaitGroup
errc
:=
make
(
chan
error
,
len
(
fetchers
)
+
1
)
wg
.
Add
(
len
(
fetchers
)
+
1
)
go
func
()
{
defer
wg
.
Done
();
errc
<-
d
.
process
()
}()
go
func
()
{
defer
wg
.
Done
();
errc
<-
d
.
process
Content
()
}()
for
_
,
fn
:=
range
fetchers
{
fn
:=
fn
go
func
()
{
defer
wg
.
Done
();
errc
<-
fn
()
}()
...
...
@@ -1149,55 +1163,38 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
return
start
,
nil
}
// fetchHeaders keeps retrieving headers from the requested number, until no more
// are returned, potentially throttling on the way.
//
// The queue parameter can be used to switch between queuing headers for block
// body download too, or directly import as pure header chains.
func
(
d
*
Downloader
)
fetchHeaders
(
p
*
peer
,
td
*
big
.
Int
,
from
uint64
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: downloading headers from #%d"
,
p
,
from
)
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
// other peers are only accepted if they map cleanly to the skeleton. If noone
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func
(
d
*
Downloader
)
fetchHeaders
(
p
*
peer
,
from
uint64
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: directing header downloads from #%d"
,
p
,
from
)
defer
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: header download terminated"
,
p
)
// Calculate the pivoting point for switching from fast to slow sync
pivot
:=
d
.
queue
.
FastSyncPivot
()
// Keep a count of uncertain headers to roll back
rollback
:=
[]
*
types
.
Header
{}
defer
func
()
{
if
len
(
rollback
)
>
0
{
// Flatten the headers and roll them back
hashes
:=
make
([]
common
.
Hash
,
len
(
rollback
))
for
i
,
header
:=
range
rollback
{
hashes
[
i
]
=
header
.
Hash
()
}
lh
,
lfb
,
lb
:=
d
.
headHeader
()
.
Number
,
d
.
headFastBlock
()
.
Number
(),
d
.
headBlock
()
.
Number
()
d
.
rollback
(
hashes
)
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)"
,
len
(
hashes
),
lh
,
d
.
headHeader
()
.
Number
,
lfb
,
d
.
headFastBlock
()
.
Number
(),
lb
,
d
.
headBlock
()
.
Number
())
// If we're already past the pivot point, this could be an attack, disable fast sync
if
rollback
[
len
(
rollback
)
-
1
]
.
Number
.
Uint64
()
>
pivot
{
d
.
noFast
=
true
}
}
}()
// Create a timeout timer, and the associated hash fetcher
request
:=
time
.
Now
()
// time of the last fetch request
// Create a timeout timer, and the associated header fetcher
skeleton
:=
true
// Skeleton assembly phase or finishing up
request
:=
time
.
Now
()
// time of the last skeleton fetch request
timeout
:=
time
.
NewTimer
(
0
)
// timer to dump a non-responsive active peer
<-
timeout
.
C
// timeout channel should be initially empty
defer
timeout
.
Stop
()
getHeaders
:=
func
(
from
uint64
)
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: fetching %d headers from #%d"
,
p
,
MaxHeaderFetch
,
from
)
go
p
.
getAbsHeaders
(
from
,
MaxHeaderFetch
,
0
,
false
)
if
skeleton
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: fetching %d skeleton headers from #%d"
,
p
,
MaxHeaderFetch
,
from
)
go
p
.
getAbsHeaders
(
from
+
uint64
(
MaxHeaderFetch
)
-
1
,
MaxSkeletonSize
,
MaxHeaderFetch
-
1
,
false
)
}
else
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: fetching %d full headers from #%d"
,
p
,
MaxHeaderFetch
,
from
)
go
p
.
getAbsHeaders
(
from
,
MaxHeaderFetch
,
0
,
false
)
}
request
=
time
.
Now
()
timeout
.
Reset
(
headerTTL
)
}
// Start pulling
headers, until all are exhausted
// Start pulling
the header chain skeleton until all is done
getHeaders
(
from
)
gotHeaders
:=
false
for
{
select
{
...
...
@@ -1205,115 +1202,44 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return
errCancelHeaderFetch
case
packet
:=
<-
d
.
headerCh
:
// Make sure the active peer is giving us the headers
// Make sure the active peer is giving us the
skeleton
headers
if
packet
.
PeerId
()
!=
p
.
id
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Received headers from incorrect peer (%s)"
,
packet
.
PeerId
())
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Received
skeleton
headers from incorrect peer (%s)"
,
packet
.
PeerId
())
break
}
headerReqTimer
.
UpdateSince
(
request
)
timeout
.
Stop
()
// If the skeleton's finished, pull any remaining head headers directly from the origin
if
packet
.
Items
()
==
0
&&
skeleton
{
skeleton
=
false
getHeaders
(
from
)
continue
}
// If no more headers are inbound, notify the content fetchers and return
if
packet
.
Items
()
==
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: no available headers"
,
p
)
for
_
,
ch
:=
range
[]
chan
bool
{
d
.
bodyWakeCh
,
d
.
receiptWakeCh
,
d
.
stateWakeCh
}
{
select
{
case
ch
<-
false
:
case
<-
d
.
cancelCh
:
}
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if
!
gotHeaders
&&
td
.
Cmp
(
d
.
getTd
(
d
.
headBlock
()
.
Hash
()))
>
0
{
return
errStallingPeer
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if
d
.
mode
==
FastSync
||
d
.
mode
==
LightSync
{
if
td
.
Cmp
(
d
.
getTd
(
d
.
headHeader
()
.
Hash
()))
>
0
{
return
errStallingPeer
}
}
rollback
=
nil
d
.
headerProcCh
<-
nil
return
nil
}
gotHeaders
=
true
headers
:=
packet
.
(
*
headerPack
)
.
headers
// Otherwise insert all the new headers, aborting in case of junk
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: schedule %d headers from #%d"
,
p
,
len
(
headers
),
from
)
if
d
.
mode
==
FastSync
||
d
.
mode
==
LightSync
{
// Collect the yet unknown headers to mark them as uncertain
unknown
:=
make
([]
*
types
.
Header
,
0
,
len
(
headers
))
for
_
,
header
:=
range
headers
{
if
!
d
.
hasHeader
(
header
.
Hash
())
{
unknown
=
append
(
unknown
,
header
)
}
}
// If we're importing pure headers, verify based on their recentness
frequency
:=
fsHeaderCheckFrequency
if
headers
[
len
(
headers
)
-
1
]
.
Number
.
Uint64
()
+
uint64
(
fsHeaderForceVerify
)
>
pivot
{
frequency
=
1
}
if
n
,
err
:=
d
.
insertHeaders
(
headers
,
frequency
);
err
!=
nil
{
// If some headers were inserted, add them too to the rollback list
if
n
>
0
{
rollback
=
append
(
rollback
,
headers
[
:
n
]
...
)
}
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: invalid header #%d [%x…]: %v"
,
p
,
headers
[
n
]
.
Number
,
headers
[
n
]
.
Hash
()
.
Bytes
()[
:
4
],
err
)
// If we received a skeleton batch, resolve internals concurrently
if
skeleton
{
filled
,
err
:=
d
.
fillHeaderSkeleton
(
from
,
headers
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: skeleton chain invalid: %v"
,
p
,
err
)
return
errInvalidChain
}
// All verifications passed, store newly found uncertain headers
rollback
=
append
(
rollback
,
unknown
...
)
if
len
(
rollback
)
>
fsHeaderSafetyNet
{
rollback
=
append
(
rollback
[
:
0
],
rollback
[
len
(
rollback
)
-
fsHeaderSafetyNet
:
]
...
)
}
}
if
d
.
mode
==
FullSync
||
d
.
mode
==
FastSync
{
inserts
:=
d
.
queue
.
Schedule
(
headers
,
from
)
if
len
(
inserts
)
!=
len
(
headers
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: stale headers"
,
p
)
return
errBadPeer
}
}
// Notify the content fetchers of new headers, but stop if queue is full
cont
:=
d
.
queue
.
PendingBlocks
()
<
maxQueuedHeaders
&&
d
.
queue
.
PendingReceipts
()
<
maxQueuedHeaders
for
_
,
ch
:=
range
[]
chan
bool
{
d
.
bodyWakeCh
,
d
.
receiptWakeCh
,
d
.
stateWakeCh
}
{
if
cont
{
// We still have headers to fetch, send continuation wake signal (potential)
select
{
case
ch
<-
true
:
default
:
}
}
else
{
// Header limit reached, send a termination wake signal (enforced)
select
{
case
ch
<-
false
:
case
<-
d
.
cancelCh
:
}
}
headers
=
filled
}
if
!
cont
{
return
nil
// Insert all the new headers and fetch the next batch
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: schedule %d headers from #%d"
,
p
,
len
(
headers
),
from
)
select
{
case
d
.
headerProcCh
<-
headers
:
case
<-
d
.
cancelCh
:
return
errCancelHeaderFetch
}
// Queue not yet full, fetch the next batch
from
+=
uint64
(
len
(
headers
))
getHeaders
(
from
)
...
...
@@ -1330,7 +1256,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case
<-
d
.
cancelCh
:
}
}
return
nil
select
{
case
d
.
headerProcCh
<-
nil
:
case
<-
d
.
cancelCh
:
}
return
errBadPeer
case
<-
d
.
hashCh
:
case
<-
d
.
blockCh
:
...
...
@@ -1340,6 +1270,34 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
func
(
d
*
Downloader
)
fillHeaderSkeleton
(
from
uint64
,
skeleton
[]
*
types
.
Header
)
([]
*
types
.
Header
,
error
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Filling up skeleton from #%d"
,
from
)
d
.
queue
.
ScheduleSkeleton
(
from
,
skeleton
)
var
(
deliver
=
func
(
packet
dataPack
)
(
int
,
error
)
{
pack
:=
packet
.
(
*
headerPack
)
return
d
.
queue
.
DeliverHeaders
(
pack
.
peerId
,
pack
.
headers
)
}
expire
=
func
()
map
[
string
]
int
{
return
d
.
queue
.
ExpireHeaders
(
headerTTL
)
}
throttle
=
func
()
bool
{
return
false
}
reserve
=
func
(
p
*
peer
,
count
int
)
(
*
fetchRequest
,
bool
,
error
)
{
return
d
.
queue
.
ReserveHeaders
(
p
,
count
),
false
,
nil
}
fetch
=
func
(
p
*
peer
,
req
*
fetchRequest
)
error
{
return
p
.
FetchHeaders
(
req
.
From
,
MaxHeaderFetch
)
}
capacity
=
func
(
p
*
peer
)
int
{
return
p
.
HeaderCapacity
()
}
setIdle
=
func
(
p
*
peer
,
accepted
int
)
{
p
.
SetHeadersIdle
(
accepted
)
}
)
err
:=
d
.
fetchParts
(
errCancelHeaderFetch
,
d
.
headerCh
,
deliver
,
d
.
queue
.
headerContCh
,
expire
,
d
.
queue
.
PendingHeaders
,
d
.
queue
.
InFlightHeaders
,
throttle
,
reserve
,
nil
,
fetch
,
d
.
queue
.
CancelHeaders
,
capacity
,
d
.
peers
.
HeaderIdlePeers
,
setIdle
,
"Header"
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Skeleton fill terminated: %v"
,
err
)
return
d
.
queue
.
RetrieveHeaders
(),
err
}
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
...
...
@@ -1398,6 +1356,11 @@ func (d *Downloader) fetchNodeData() error {
deliver
=
func
(
packet
dataPack
)
(
int
,
error
)
{
start
:=
time
.
Now
()
return
d
.
queue
.
DeliverNodeData
(
packet
.
PeerId
(),
packet
.
(
*
statePack
)
.
states
,
func
(
err
error
,
delivered
int
)
{
// If the peer gave us nothing, stalling fast sync, drop
if
delivered
==
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"peer %s: stalling state delivery, dropping"
,
packet
.
PeerId
())
d
.
dropPeer
(
packet
.
PeerId
())
}
if
err
!=
nil
{
// If the node data processing failed, the root hash is very wrong, abort
glog
.
V
(
logger
.
Error
)
.
Infof
(
"peer %d: state processing failed: %v"
,
packet
.
PeerId
(),
err
)
...
...
@@ -1554,7 +1517,9 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
continue
}
if
glog
.
V
(
logger
.
Detail
)
{
if
len
(
request
.
Headers
)
>
0
{
if
request
.
From
>
0
{
glog
.
Infof
(
"%s: requesting %s(s) from #%d"
,
peer
,
strings
.
ToLower
(
kind
),
request
.
From
)
}
else
if
len
(
request
.
Headers
)
>
0
{
glog
.
Infof
(
"%s: requesting %d %s(s), first at #%d"
,
peer
,
len
(
request
.
Headers
),
strings
.
ToLower
(
kind
),
request
.
Headers
[
0
]
.
Number
)
}
else
{
glog
.
Infof
(
"%s: requesting %d %s(s)"
,
peer
,
len
(
request
.
Hashes
),
strings
.
ToLower
(
kind
))
...
...
@@ -1588,9 +1553,162 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
}
// process takes fetch results from the queue and tries to import them into the
// chain. The type of import operation will depend on the result contents.
func
(
d
*
Downloader
)
process
()
error
{
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func
(
d
*
Downloader
)
processHeaders
(
origin
uint64
,
td
*
big
.
Int
)
error
{
// Calculate the pivoting point for switching from fast to slow sync
pivot
:=
d
.
queue
.
FastSyncPivot
()
// Keep a count of uncertain headers to roll back
rollback
:=
[]
*
types
.
Header
{}
defer
func
()
{
if
len
(
rollback
)
>
0
{
// Flatten the headers and roll them back
hashes
:=
make
([]
common
.
Hash
,
len
(
rollback
))
for
i
,
header
:=
range
rollback
{
hashes
[
i
]
=
header
.
Hash
()
}
lh
,
lfb
,
lb
:=
d
.
headHeader
()
.
Number
,
d
.
headFastBlock
()
.
Number
(),
d
.
headBlock
()
.
Number
()
d
.
rollback
(
hashes
)
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)"
,
len
(
hashes
),
lh
,
d
.
headHeader
()
.
Number
,
lfb
,
d
.
headFastBlock
()
.
Number
(),
lb
,
d
.
headBlock
()
.
Number
())
// If we're already past the pivot point, this could be an attack, disable fast sync
if
rollback
[
len
(
rollback
)
-
1
]
.
Number
.
Uint64
()
>
pivot
{
d
.
noFast
=
true
}
}
}()
// Wait for batches of headers to process
gotHeaders
:=
false
for
{
select
{
case
<-
d
.
cancelCh
:
return
errCancelHeaderProcessing
case
headers
:=
<-
d
.
headerProcCh
:
// Terminate header processing if we synced up
if
len
(
headers
)
==
0
{
// Notify everyone that headers are fully processed
for
_
,
ch
:=
range
[]
chan
bool
{
d
.
bodyWakeCh
,
d
.
receiptWakeCh
,
d
.
stateWakeCh
}
{
select
{
case
ch
<-
false
:
case
<-
d
.
cancelCh
:
}
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fecher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if
!
gotHeaders
&&
td
.
Cmp
(
d
.
getTd
(
d
.
headBlock
()
.
Hash
()))
>
0
{
return
errStallingPeer
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if
d
.
mode
==
FastSync
||
d
.
mode
==
LightSync
{
if
td
.
Cmp
(
d
.
getTd
(
d
.
headHeader
()
.
Hash
()))
>
0
{
return
errStallingPeer
}
}
// Disable any rollback and return
rollback
=
nil
return
nil
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders
=
true
for
len
(
headers
)
>
0
{
// Terminate if something failed in between processing chunks
select
{
case
<-
d
.
cancelCh
:
return
errCancelHeaderProcessing
default
:
}
// Select the next chunk of headers to import
limit
:=
maxHeadersProcess
if
limit
>
len
(
headers
)
{
limit
=
len
(
headers
)
}
chunk
:=
headers
[
:
limit
]
// In case of header only syncing, validate the chunk immediately
if
d
.
mode
==
FastSync
||
d
.
mode
==
LightSync
{
// Collect the yet unknown headers to mark them as uncertain
unknown
:=
make
([]
*
types
.
Header
,
0
,
len
(
headers
))
for
_
,
header
:=
range
chunk
{
if
!
d
.
hasHeader
(
header
.
Hash
())
{
unknown
=
append
(
unknown
,
header
)
}
}
// If we're importing pure headers, verify based on their recentness
frequency
:=
fsHeaderCheckFrequency
if
chunk
[
len
(
chunk
)
-
1
]
.
Number
.
Uint64
()
+
uint64
(
fsHeaderForceVerify
)
>
pivot
{
frequency
=
1
}
if
n
,
err
:=
d
.
insertHeaders
(
chunk
,
frequency
);
err
!=
nil
{
// If some headers were inserted, add them too to the rollback list
if
n
>
0
{
rollback
=
append
(
rollback
,
chunk
[
:
n
]
...
)
}
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"invalid header #%d [%x…]: %v"
,
chunk
[
n
]
.
Number
,
chunk
[
n
]
.
Hash
()
.
Bytes
()[
:
4
],
err
)
return
errInvalidChain
}
// All verifications passed, store newly found uncertain headers
rollback
=
append
(
rollback
,
unknown
...
)
if
len
(
rollback
)
>
fsHeaderSafetyNet
{
rollback
=
append
(
rollback
[
:
0
],
rollback
[
len
(
rollback
)
-
fsHeaderSafetyNet
:
]
...
)
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if
d
.
mode
==
FullSync
||
d
.
mode
==
FastSync
{
// If we've reached the allowed number of pending headers, stall a bit
for
d
.
queue
.
PendingBlocks
()
>=
maxQueuedHeaders
||
d
.
queue
.
PendingReceipts
()
>=
maxQueuedHeaders
{
select
{
case
<-
d
.
cancelCh
:
return
errCancelHeaderProcessing
case
<-
time
.
After
(
time
.
Second
)
:
}
}
// Otherwise insert the headers for content retrieval
inserts
:=
d
.
queue
.
Schedule
(
chunk
,
origin
)
if
len
(
inserts
)
!=
len
(
chunk
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"stale headers"
)
return
errBadPeer
}
}
headers
=
headers
[
limit
:
]
origin
+=
uint64
(
limit
)
}
// Signal the content downloaders of the availablility of new tasks
for
_
,
ch
:=
range
[]
chan
bool
{
d
.
bodyWakeCh
,
d
.
receiptWakeCh
,
d
.
stateWakeCh
}
{
select
{
case
ch
<-
true
:
default
:
}
}
}
}
}
// processContent takes fetch results from the queue and tries to import them
// into the chain. The type of import operation will depend on the result contents.
func
(
d
*
Downloader
)
processContent
()
error
{
pivot
:=
d
.
queue
.
FastSyncPivot
()
for
{
results
:=
d
.
queue
.
WaitResults
()
...
...
@@ -1608,7 +1726,7 @@ func (d *Downloader) process() error {
for
len
(
results
)
!=
0
{
// Check for any termination requests
if
atomic
.
LoadInt32
(
&
d
.
interrupt
)
==
1
{
return
errCancelProcessing
return
errCancel
Content
Processing
}
// Retrieve the a batch of results to import
var
(
...
...
eth/downloader/downloader_test.go
View file @
b40dc8a1
...
...
@@ -560,8 +560,8 @@ func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) fu
hashes
:=
dl
.
peerHashes
[
id
]
headers
:=
dl
.
peerHeaders
[
id
]
result
:=
make
([]
*
types
.
Header
,
0
,
amount
)
for
i
:=
0
;
i
<
amount
&&
len
(
hashes
)
-
int
(
origin
)
-
1
-
i
>=
0
;
i
++
{
if
header
,
ok
:=
headers
[
hashes
[
len
(
hashes
)
-
int
(
origin
)
-
1
-
i
]];
ok
{
for
i
:=
0
;
i
<
amount
&&
len
(
hashes
)
-
int
(
origin
)
-
1
-
i
*
(
skip
+
1
)
>=
0
;
i
++
{
if
header
,
ok
:=
headers
[
hashes
[
len
(
hashes
)
-
int
(
origin
)
-
1
-
i
*
(
skip
+
1
)
]];
ok
{
result
=
append
(
result
,
header
)
}
}
...
...
@@ -1348,27 +1348,28 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
result
error
drop
bool
}{
{
nil
,
false
},
// Sync succeeded, all is well
{
errBusy
,
false
},
// Sync is already in progress, no problem
{
errUnknownPeer
,
false
},
// Peer is unknown, was already dropped, don't double drop
{
errBadPeer
,
true
},
// Peer was deemed bad for some reason, drop it
{
errStallingPeer
,
true
},
// Peer was detected to be stalling, drop it
{
errNoPeers
,
false
},
// No peers to download from, soft race, no issue
{
errTimeout
,
true
},
// No hashes received in due time, drop the peer
{
errEmptyHashSet
,
true
},
// No hashes were returned as a response, drop as it's a dead end
{
errEmptyHeaderSet
,
true
},
// No headers were returned as a response, drop as it's a dead end
{
errPeersUnavailable
,
true
},
// Nobody had the advertised blocks, drop the advertiser
{
errInvalidAncestor
,
true
},
// Agreed upon ancestor is not acceptable, drop the chain rewriter
{
errInvalidChain
,
true
},
// Hash chain was detected as invalid, definitely drop
{
errInvalidBlock
,
false
},
// A bad peer was detected, but not the sync origin
{
errInvalidBody
,
false
},
// A bad peer was detected, but not the sync origin
{
errInvalidReceipt
,
false
},
// A bad peer was detected, but not the sync origin
{
errCancelHashFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelBlockFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelHeaderFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelBodyFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelReceiptFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelProcessing
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
nil
,
false
},
// Sync succeeded, all is well
{
errBusy
,
false
},
// Sync is already in progress, no problem
{
errUnknownPeer
,
false
},
// Peer is unknown, was already dropped, don't double drop
{
errBadPeer
,
true
},
// Peer was deemed bad for some reason, drop it
{
errStallingPeer
,
true
},
// Peer was detected to be stalling, drop it
{
errNoPeers
,
false
},
// No peers to download from, soft race, no issue
{
errTimeout
,
true
},
// No hashes received in due time, drop the peer
{
errEmptyHashSet
,
true
},
// No hashes were returned as a response, drop as it's a dead end
{
errEmptyHeaderSet
,
true
},
// No headers were returned as a response, drop as it's a dead end
{
errPeersUnavailable
,
true
},
// Nobody had the advertised blocks, drop the advertiser
{
errInvalidAncestor
,
true
},
// Agreed upon ancestor is not acceptable, drop the chain rewriter
{
errInvalidChain
,
true
},
// Hash chain was detected as invalid, definitely drop
{
errInvalidBlock
,
false
},
// A bad peer was detected, but not the sync origin
{
errInvalidBody
,
false
},
// A bad peer was detected, but not the sync origin
{
errInvalidReceipt
,
false
},
// A bad peer was detected, but not the sync origin
{
errCancelHashFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelBlockFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelHeaderFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelBodyFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelReceiptFetch
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelHeaderProcessing
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
{
errCancelContentProcessing
,
false
},
// Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester
:=
newTester
()
...
...
eth/downloader/peer.go
View file @
b40dc8a1
...
...
@@ -58,15 +58,18 @@ type peer struct {
id
string
// Unique identifier of the peer
head
common
.
Hash
// Hash of the peers latest known block
headerIdle
int32
// Current header activity state of the peer (idle = 0, active = 1)
blockIdle
int32
// Current block activity state of the peer (idle = 0, active = 1)
receiptIdle
int32
// Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle
int32
// Current node data activity state of the peer (idle = 0, active = 1)
headerThroughput
float64
// Number of headers measured to be retrievable per second
blockThroughput
float64
// Number of blocks (bodies) measured to be retrievable per second
receiptThroughput
float64
// Number of receipts measured to be retrievable per second
stateThroughput
float64
// Number of node data pieces measured to be retrievable per second
blockStarted
time
.
Time
// Time instance when the last block (body)fetch was started
headerStarted
time
.
Time
// Time instance when the last header fetch was started
blockStarted
time
.
Time
// Time instance when the last block (body) fetch was started
receiptStarted
time
.
Time
// Time instance when the last receipt fetch was started
stateStarted
time
.
Time
// Time instance when the last node data fetch was started
...
...
@@ -118,10 +121,12 @@ func (p *peer) Reset() {
p
.
lock
.
Lock
()
defer
p
.
lock
.
Unlock
()
atomic
.
StoreInt32
(
&
p
.
headerIdle
,
0
)
atomic
.
StoreInt32
(
&
p
.
blockIdle
,
0
)
atomic
.
StoreInt32
(
&
p
.
receiptIdle
,
0
)
atomic
.
StoreInt32
(
&
p
.
stateIdle
,
0
)
p
.
headerThroughput
=
0
p
.
blockThroughput
=
0
p
.
receiptThroughput
=
0
p
.
stateThroughput
=
0
...
...
@@ -151,6 +156,24 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return
nil
}
// FetchHeaders sends a header retrieval request to the remote peer.
func
(
p
*
peer
)
FetchHeaders
(
from
uint64
,
count
int
)
error
{
// Sanity check the protocol version
if
p
.
version
<
62
{
panic
(
fmt
.
Sprintf
(
"header fetch [eth/62+] requested on eth/%d"
,
p
.
version
))
}
// Short circuit if the peer is already fetching
if
!
atomic
.
CompareAndSwapInt32
(
&
p
.
headerIdle
,
0
,
1
)
{
return
errAlreadyFetching
}
p
.
headerStarted
=
time
.
Now
()
// Issue the header retrieval request (absolut upwards without gaps)
go
p
.
getAbsHeaders
(
from
,
count
,
0
,
false
)
return
nil
}
// FetchBodies sends a block body retrieval request to the remote peer.
func
(
p
*
peer
)
FetchBodies
(
request
*
fetchRequest
)
error
{
// Sanity check the protocol version
...
...
@@ -217,6 +240,13 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return
nil
}
// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
// requests. Its estimated header retrieval throughput is updated with that measured
// just now.
func
(
p
*
peer
)
SetHeadersIdle
(
delivered
int
)
{
p
.
setIdle
(
p
.
headerStarted
,
delivered
,
&
p
.
headerThroughput
,
&
p
.
headerIdle
)
}
// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// requests. Its estimated block retrieval throughput is updated with that measured
// just now.
...
...
@@ -264,6 +294,15 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
*
throughput
=
(
1
-
throughputImpact
)
*
(
*
throughput
)
+
throughputImpact
*
measured
}
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
func
(
p
*
peer
)
HeaderCapacity
()
int
{
p
.
lock
.
RLock
()
defer
p
.
lock
.
RUnlock
()
return
int
(
math
.
Max
(
1
,
math
.
Min
(
p
.
headerThroughput
*
float64
(
headerTargetRTT
)
/
float64
(
time
.
Second
),
float64
(
MaxHeaderFetch
))))
}
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
func
(
p
*
peer
)
BlockCapacity
()
int
{
...
...
@@ -323,14 +362,15 @@ func (p *peer) String() string {
defer
p
.
lock
.
RUnlock
()
return
fmt
.
Sprintf
(
"Peer %s [%s]"
,
p
.
id
,
fmt
.
Sprintf
(
"blocks %3.2f/s, "
,
p
.
blockThroughput
)
+
fmt
.
Sprintf
(
"headers %3.2f/s, "
,
p
.
headerThroughput
)
+
fmt
.
Sprintf
(
"blocks %3.2f/s, "
,
p
.
blockThroughput
)
+
fmt
.
Sprintf
(
"receipts %3.2f/s, "
,
p
.
receiptThroughput
)
+
fmt
.
Sprintf
(
"states %3.2f/s, "
,
p
.
stateThroughput
)
+
fmt
.
Sprintf
(
"lacking %4d"
,
len
(
p
.
lacking
)),
)
}
// peerSet represents the collection of active peer participating in the
block
// peerSet represents the collection of active peer participating in the
chain
// download procedure.
type
peerSet
struct
{
peers
map
[
string
]
*
peer
...
...
@@ -359,7 +399,7 @@ func (ps *peerSet) Reset() {
// peer is already known.
//
// The method also sets the starting throughput values of the new peer to the
// average of all existing peers, to give it a realistic chan
g
e of being used
// average of all existing peers, to give it a realistic chan
c
e of being used
// for data retrievals.
func
(
ps
*
peerSet
)
Register
(
p
*
peer
)
error
{
ps
.
lock
.
Lock
()
...
...
@@ -369,15 +409,17 @@ func (ps *peerSet) Register(p *peer) error {
return
errAlreadyRegistered
}
if
len
(
ps
.
peers
)
>
0
{
p
.
blockThroughput
,
p
.
receiptThroughput
,
p
.
stateThroughput
=
0
,
0
,
0
p
.
headerThroughput
,
p
.
blockThroughput
,
p
.
receiptThroughput
,
p
.
stateThroughput
=
0
,
0
,
0
,
0
for
_
,
peer
:=
range
ps
.
peers
{
peer
.
lock
.
RLock
()
p
.
headerThroughput
+=
peer
.
headerThroughput
p
.
blockThroughput
+=
peer
.
blockThroughput
p
.
receiptThroughput
+=
peer
.
receiptThroughput
p
.
stateThroughput
+=
peer
.
stateThroughput
peer
.
lock
.
RUnlock
()
}
p
.
headerThroughput
/=
float64
(
len
(
ps
.
peers
))
p
.
blockThroughput
/=
float64
(
len
(
ps
.
peers
))
p
.
receiptThroughput
/=
float64
(
len
(
ps
.
peers
))
p
.
stateThroughput
/=
float64
(
len
(
ps
.
peers
))
...
...
@@ -441,6 +483,20 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
return
ps
.
idlePeers
(
61
,
61
,
idle
,
throughput
)
}
// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
// within the active peer set, ordered by their reputation.
func
(
ps
*
peerSet
)
HeaderIdlePeers
()
([]
*
peer
,
int
)
{
idle
:=
func
(
p
*
peer
)
bool
{
return
atomic
.
LoadInt32
(
&
p
.
headerIdle
)
==
0
}
throughput
:=
func
(
p
*
peer
)
float64
{
p
.
lock
.
RLock
()
defer
p
.
lock
.
RUnlock
()
return
p
.
headerThroughput
}
return
ps
.
idlePeers
(
62
,
64
,
idle
,
throughput
)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation.
func
(
ps
*
peerSet
)
BodyIdlePeers
()
([]
*
peer
,
int
)
{
...
...
eth/downloader/queue.go
View file @
b40dc8a1
...
...
@@ -39,8 +39,8 @@ import (
)
var
(
blockCacheLimit
=
8192
// Maximum number of blocks to cache before throttling the download
maxInFlightStates
=
4096
// Maximum number of state downloads to allow concurrently
blockCacheLimit
=
16384
// Maximum number of blocks to cache before throttling the download
maxInFlightStates
=
8192
// Maximum number of state downloads to allow concurrently
)
var
(
...
...
@@ -52,6 +52,7 @@ var (
// fetchRequest is a currently running data retrieval operation.
type
fetchRequest
struct
{
Peer
*
peer
// Peer to which the request was sent
From
uint64
// [eth/62] Requested chain element index (used for skeleton fills only)
Hashes
map
[
common
.
Hash
]
int
// [eth/61] Requested hashes with their insertion index (priority)
Headers
[]
*
types
.
Header
// [eth/62] Requested headers, sorted by request order
Time
time
.
Time
// Time when the request was made
...
...
@@ -79,6 +80,17 @@ type queue struct {
headerHead
common
.
Hash
// [eth/62] Hash of the last queued header to verify order
// Headers are "special", they download in batches, supported by a skeleton chain
headerTaskPool
map
[
uint64
]
*
types
.
Header
// [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue
*
prque
.
Prque
// [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss
map
[
string
]
map
[
uint64
]
struct
{}
// [eth/62] Set of per-peer header batches known to be unavailable
headerPendPool
map
[
string
]
*
fetchRequest
// [eth/62] Currently pending header retrieval operations
headerDonePool
map
[
uint64
]
struct
{}
// [eth/62] Set of the completed header fetches
headerResults
[]
*
types
.
Header
// [eth/62] Result cache accumulating the completed headers
headerOffset
uint64
// [eth/62] Number of the first header in the result cache
headerContCh
chan
bool
// [eth/62] Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool
map
[
common
.
Hash
]
*
types
.
Header
// [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue
*
prque
.
Prque
// [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool
map
[
string
]
*
fetchRequest
// [eth/62] Currently pending block (body) retrieval operations
...
...
@@ -113,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
return
&
queue
{
hashPool
:
make
(
map
[
common
.
Hash
]
int
),
hashQueue
:
prque
.
New
(),
headerPendPool
:
make
(
map
[
string
]
*
fetchRequest
),
headerContCh
:
make
(
chan
bool
),
blockTaskPool
:
make
(
map
[
common
.
Hash
]
*
types
.
Header
),
blockTaskQueue
:
prque
.
New
(),
blockPendPool
:
make
(
map
[
string
]
*
fetchRequest
),
...
...
@@ -149,6 +163,8 @@ func (q *queue) Reset() {
q
.
headerHead
=
common
.
Hash
{}
q
.
headerPendPool
=
make
(
map
[
string
]
*
fetchRequest
)
q
.
blockTaskPool
=
make
(
map
[
common
.
Hash
]
*
types
.
Header
)
q
.
blockTaskQueue
.
Reset
()
q
.
blockPendPool
=
make
(
map
[
string
]
*
fetchRequest
)
...
...
@@ -178,6 +194,14 @@ func (q *queue) Close() {
q
.
active
.
Broadcast
()
}
// PendingHeaders retrieves the number of header requests pending for retrieval.
func
(
q
*
queue
)
PendingHeaders
()
int
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
return
q
.
headerTaskQueue
.
Size
()
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func
(
q
*
queue
)
PendingBlocks
()
int
{
q
.
lock
.
Lock
()
...
...
@@ -205,6 +229,15 @@ func (q *queue) PendingNodeData() int {
return
0
}
// InFlightHeaders retrieves whether there are header fetch requests currently
// in flight.
func
(
q
*
queue
)
InFlightHeaders
()
bool
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
return
len
(
q
.
headerPendPool
)
>
0
}
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func
(
q
*
queue
)
InFlightBlocks
()
bool
{
...
...
@@ -317,6 +350,44 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
return
inserts
}
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
func
(
q
*
queue
)
ScheduleSkeleton
(
from
uint64
,
skeleton
[]
*
types
.
Header
)
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
// No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
if
q
.
headerResults
!=
nil
{
panic
(
"skeleton assembly already in progress"
)
}
// Shedule all the header retrieval tasks for the skeleton assembly
q
.
headerTaskPool
=
make
(
map
[
uint64
]
*
types
.
Header
)
q
.
headerTaskQueue
=
prque
.
New
()
q
.
headerPeerMiss
=
make
(
map
[
string
]
map
[
uint64
]
struct
{})
// Reset availability to correct invalid chains
q
.
headerResults
=
make
([]
*
types
.
Header
,
len
(
skeleton
)
*
MaxHeaderFetch
)
q
.
headerOffset
=
from
q
.
headerContCh
=
make
(
chan
bool
,
1
)
for
i
,
header
:=
range
skeleton
{
index
:=
from
+
uint64
(
i
*
MaxHeaderFetch
)
q
.
headerTaskPool
[
index
]
=
header
q
.
headerTaskQueue
.
Push
(
index
,
-
float32
(
index
))
}
}
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
func
(
q
*
queue
)
RetrieveHeaders
()
[]
*
types
.
Header
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
headers
:=
q
.
headerResults
q
.
headerResults
=
nil
return
headers
}
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
func
(
q
*
queue
)
Schedule
(
headers
[]
*
types
.
Header
,
from
uint64
)
[]
*
types
.
Header
{
...
...
@@ -437,6 +508,46 @@ func (q *queue) countProcessableItems() int {
return
len
(
q
.
resultCache
)
}
// ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches.
func
(
q
*
queue
)
ReserveHeaders
(
p
*
peer
,
count
int
)
*
fetchRequest
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
// Short circuit if the peer's already downloading something (sanity check to
// not corrupt state)
if
_
,
ok
:=
q
.
headerPendPool
[
p
.
id
];
ok
{
return
nil
}
// Retrieve a batch of hashes, skipping previously failed ones
send
,
skip
:=
uint64
(
0
),
[]
uint64
{}
for
send
==
0
&&
!
q
.
headerTaskQueue
.
Empty
()
{
from
,
_
:=
q
.
headerTaskQueue
.
Pop
()
if
q
.
headerPeerMiss
[
p
.
id
]
!=
nil
{
if
_
,
ok
:=
q
.
headerPeerMiss
[
p
.
id
][
from
.
(
uint64
)];
ok
{
skip
=
append
(
skip
,
from
.
(
uint64
))
continue
}
}
send
=
from
.
(
uint64
)
}
// Merge all the skipped batches back
for
_
,
from
:=
range
skip
{
q
.
headerTaskQueue
.
Push
(
from
,
-
float32
(
from
))
}
// Assemble and return the block download request
if
send
==
0
{
return
nil
}
request
:=
&
fetchRequest
{
Peer
:
p
,
From
:
send
,
Time
:
time
.
Now
(),
}
q
.
headerPendPool
[
p
.
id
]
=
request
return
request
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download.
func
(
q
*
queue
)
ReserveBlocks
(
p
*
peer
,
count
int
)
*
fetchRequest
{
...
...
@@ -635,6 +746,11 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
return
request
,
progress
,
nil
}
// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
func
(
q
*
queue
)
CancelHeaders
(
request
*
fetchRequest
)
{
q
.
cancel
(
request
,
q
.
headerTaskQueue
,
q
.
headerPendPool
)
}
// CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
func
(
q
*
queue
)
CancelBlocks
(
request
*
fetchRequest
)
{
q
.
cancel
(
request
,
q
.
hashQueue
,
q
.
blockPendPool
)
...
...
@@ -663,6 +779,9 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
if
request
.
From
>
0
{
taskQueue
.
Push
(
request
.
From
,
-
float32
(
request
.
From
))
}
for
hash
,
index
:=
range
request
.
Hashes
{
taskQueue
.
Push
(
hash
,
float32
(
index
))
}
...
...
@@ -702,6 +821,15 @@ func (q *queue) Revoke(peerId string) {
}
}
// ExpireHeaders checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
func
(
q
*
queue
)
ExpireHeaders
(
timeout
time
.
Duration
)
map
[
string
]
int
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
return
q
.
expire
(
timeout
,
q
.
headerPendPool
,
q
.
headerTaskQueue
,
headerTimeoutMeter
)
}
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
func
(
q
*
queue
)
ExpireBlocks
(
timeout
time
.
Duration
)
map
[
string
]
int
{
...
...
@@ -753,6 +881,9 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
timeoutMeter
.
Mark
(
1
)
// Return any non satisfied requests to the pool
if
request
.
From
>
0
{
taskQueue
.
Push
(
request
.
From
,
-
float32
(
request
.
From
))
}
for
hash
,
index
:=
range
request
.
Hashes
{
taskQueue
.
Push
(
hash
,
float32
(
index
))
}
...
...
@@ -842,6 +973,73 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
}
}
// DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton.
func
(
q
*
queue
)
DeliverHeaders
(
id
string
,
headers
[]
*
types
.
Header
)
(
int
,
error
)
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
// Short circuit if the data was never requested
request
:=
q
.
headerPendPool
[
id
]
if
request
==
nil
{
return
0
,
errNoFetchesPending
}
headerReqTimer
.
UpdateSince
(
request
.
Time
)
delete
(
q
.
headerPendPool
,
id
)
// Ensure headers can be mapped onto the skeleton chain
target
:=
q
.
headerTaskPool
[
request
.
From
]
.
Hash
()
accepted
:=
len
(
headers
)
==
MaxHeaderFetch
if
accepted
{
if
headers
[
0
]
.
Number
.
Uint64
()
!=
request
.
From
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: first header #%v [%x] broke chain ordering, expected %d"
,
id
,
headers
[
0
]
.
Number
,
headers
[
0
]
.
Hash
()
.
Bytes
()[
:
4
],
request
.
From
)
accepted
=
false
}
else
if
headers
[
len
(
headers
)
-
1
]
.
Hash
()
!=
target
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: last header #%v [%x] broke skeleton structure, expected %x"
,
id
,
headers
[
len
(
headers
)
-
1
]
.
Number
,
headers
[
len
(
headers
)
-
1
]
.
Hash
()
.
Bytes
()[
:
4
],
target
[
:
4
])
accepted
=
false
}
}
if
accepted
{
for
i
,
header
:=
range
headers
[
1
:
]
{
hash
:=
header
.
Hash
()
if
want
:=
request
.
From
+
1
+
uint64
(
i
);
header
.
Number
.
Uint64
()
!=
want
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Peer %s: header #%v [%x] broke chain ordering, expected %d"
,
id
,
header
.
Number
,
hash
[
:
4
],
want
)
accepted
=
false
break
}
if
headers
[
i
]
.
Hash
()
!=
header
.
ParentHash
{
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Peer %s: header #%v [%x] broke chain ancestry"
,
id
,
header
.
Number
,
hash
[
:
4
])
accepted
=
false
break
}
}
}
// If the batch of headers wasn't accepted, mark as unavailable
if
!
accepted
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: skeleton filling from header #%d not accepted"
,
id
,
request
.
From
)
miss
:=
q
.
headerPeerMiss
[
id
]
if
miss
==
nil
{
q
.
headerPeerMiss
[
id
]
=
make
(
map
[
uint64
]
struct
{})
miss
=
q
.
headerPeerMiss
[
id
]
}
miss
[
request
.
From
]
=
struct
{}{}
q
.
headerTaskQueue
.
Push
(
request
.
From
,
-
float32
(
request
.
From
))
return
0
,
errors
.
New
(
"delivery not accepted"
)
}
// Clean up a successful fetch, check for termination and return
copy
(
q
.
headerResults
[
request
.
From
-
q
.
headerOffset
:
],
headers
)
delete
(
q
.
headerTaskPool
,
request
.
From
)
if
len
(
q
.
headerTaskPool
)
==
0
{
q
.
headerContCh
<-
false
}
return
len
(
headers
),
nil
}
// DeliverBodies injects a block body retrieval response into the results queue.
// The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment