Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
026ee406
Commit
026ee406
authored
Jun 16, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth/fetcher: deduplicate future blocks
parent
11c8f83a
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
71 additions
and
25 deletions
+71
-25
fetcher.go
eth/fetcher/fetcher.go
+35
-25
fetcher_test.go
eth/fetcher/fetcher_test.go
+36
-0
No files found.
eth/fetcher/fetcher.go
View file @
026ee406
...
...
@@ -153,11 +153,28 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
func
(
f
*
Fetcher
)
loop
()
{
announced
:=
make
(
map
[
common
.
Hash
][]
*
announce
)
fetching
:=
make
(
map
[
common
.
Hash
]
*
announce
)
queued
:=
prque
.
New
()
// Create the priority queue and a matching presence set
queue
:=
prque
.
New
()
queued
:=
make
(
map
[
common
.
Hash
]
struct
{})
enqueue
:=
func
(
peer
string
,
block
*
types
.
Block
)
{
// Make sure the block isn't in some weird place
if
f
.
chainHeight
()
+
maxQueueDist
<
block
.
NumberU64
()
{
return
}
// If not, schedule the block for future import
hash
:=
block
.
Hash
()
if
_
,
ok
:=
queued
[
hash
];
!
ok
{
queued
[
hash
]
=
struct
{}{}
queue
.
Push
(
&
inject
{
origin
:
peer
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: queued block %x, total %v"
,
peer
,
hash
.
Bytes
()[
:
4
],
queue
.
Size
())
}
}
// Iterate the block fetching until a quit is requested
fetch
:=
time
.
NewTimer
(
0
)
done
:=
make
(
chan
common
.
Hash
)
// Iterate the block fetching until a quit is requested
for
{
// Clean up any expired block fetches
for
hash
,
announce
:=
range
fetching
{
...
...
@@ -168,24 +185,26 @@ func (f *Fetcher) loop() {
}
// Import any queued blocks that could potentially fit
height
:=
f
.
chainHeight
()
for
!
queued
.
Empty
()
{
// Fetch the next block, and skip if already known
op
:=
queued
.
PopItem
()
.
(
*
inject
)
if
f
.
hasBlock
(
op
.
block
.
Hash
())
{
continue
}
// If unknown, but too high up the chain, continue later
for
!
queue
.
Empty
()
{
// If too high up the chain, continue later
op
:=
queue
.
PopItem
()
.
(
*
inject
)
if
number
:=
op
.
block
.
NumberU64
();
number
>
height
+
1
{
queue
d
.
Push
(
op
,
-
float32
(
op
.
block
.
NumberU64
()))
queue
.
Push
(
op
,
-
float32
(
op
.
block
.
NumberU64
()))
break
}
// Otherwise if not known yet, try and import
hash
:=
op
.
block
.
Hash
()
delete
(
queued
,
hash
)
if
f
.
hasBlock
(
hash
)
{
continue
}
// Block may just fit, try to import it
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: importing block %x"
,
op
.
origin
,
op
.
block
.
Hash
()
.
Bytes
()[
:
4
])
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: importing block %x"
,
op
.
origin
,
hash
.
Bytes
()[
:
4
])
go
func
()
{
defer
func
()
{
done
<-
op
.
block
.
Hash
()
}()
defer
func
()
{
done
<-
hash
}()
if
err
:=
f
.
importBlock
(
op
.
origin
,
op
.
block
);
err
!=
nil
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: block %x import failed: %v"
,
op
.
origin
,
op
.
block
.
Hash
()
.
Bytes
()[
:
4
],
err
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: block %x import failed: %v"
,
op
.
origin
,
hash
.
Bytes
()[
:
4
],
err
)
return
}
}()
...
...
@@ -210,8 +229,7 @@ func (f *Fetcher) loop() {
case
op
:=
<-
f
.
insert
:
// A direct block insertion was requested, try and fill any pending gaps
queued
.
Push
(
op
,
-
float32
(
op
.
block
.
NumberU64
()))
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: filled block %x, total %v"
,
op
.
origin
,
op
.
block
.
Hash
()
.
Bytes
()[
:
4
],
queued
.
Size
())
enqueue
(
op
.
origin
,
op
.
block
)
case
hash
:=
<-
done
:
// A pending import finished, remove all traces of the notification
...
...
@@ -281,17 +299,9 @@ func (f *Fetcher) loop() {
return
}
// Schedule the retrieved blocks for ordered import
height
:=
f
.
chainHeight
()
for
_
,
block
:=
range
explicit
{
// Skip any blocks too far into the future
if
height
+
maxQueueDist
<
block
.
NumberU64
()
{
continue
}
// Otherwise if the announce is still pending, schedule
hash
:=
block
.
Hash
()
if
announce
:=
fetching
[
hash
];
announce
!=
nil
{
queued
.
Push
(
&
inject
{
origin
:
announce
.
origin
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: scheduled block %x, total %v"
,
announce
.
origin
,
hash
[
:
4
],
queued
.
Size
())
if
announce
:=
fetching
[
block
.
Hash
()];
announce
!=
nil
{
enqueue
(
announce
.
origin
,
block
)
}
}
}
...
...
eth/fetcher/fetcher_test.go
View file @
026ee406
...
...
@@ -299,3 +299,39 @@ func TestQueueGapFill(t *testing.T) {
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
targetBlocks
+
1
)
}
}
// Tests that blocks arriving from various sources (multiple propagations, hash
// announces, etc) do not get scheduled for import multiple times.
func
TestImportDeduplication
(
t
*
testing
.
T
)
{
// Create two blocks to import (one for duplication, the other for stalling)
hashes
:=
createHashes
(
2
,
knownHash
)
blocks
:=
createBlocksFromHashes
(
hashes
)
// Create the tester and wrap the importer with a counter
tester
:=
newTester
()
fetcher
:=
tester
.
makeFetcher
(
blocks
)
counter
:=
uint32
(
0
)
tester
.
fetcher
.
importBlock
=
func
(
peer
string
,
block
*
types
.
Block
)
error
{
atomic
.
AddUint32
(
&
counter
,
1
)
return
tester
.
importBlock
(
peer
,
block
)
}
// Announce the duplicating block, wait for retrieval, and also propagate directly
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
0
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
time
.
Sleep
(
50
*
time
.
Millisecond
)
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
0
]])
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
0
]])
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
0
]])
// Fill the missing block directly as if propagated, and check import uniqueness
tester
.
fetcher
.
Enqueue
(
"valid"
,
blocks
[
hashes
[
1
]])
time
.
Sleep
(
50
*
time
.
Millisecond
)
if
imported
:=
len
(
tester
.
ownBlocks
);
imported
!=
3
{
t
.
Fatalf
(
"synchronised block mismatch: have %v, want %v"
,
imported
,
3
)
}
if
counter
!=
2
{
t
.
Fatalf
(
"import invocation count mismatch: have %v, want %v"
,
counter
,
2
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment