Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
8244825b
Commit
8244825b
authored
Apr 18, 2015
by
obscuren
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
downloader: reset the queue if a peer response with an empty hash set
parent
eef4776b
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
37 additions
and
15 deletions
+37
-15
downloader.go
eth/downloader/downloader.go
+25
-14
queue.go
eth/downloader/queue.go
+11
-0
handler.go
eth/handler.go
+1
-1
No files found.
eth/downloader/downloader.go
View file @
8244825b
...
@@ -17,8 +17,9 @@ import (
...
@@ -17,8 +17,9 @@ import (
)
)
const
(
const
(
maxBlockFetch
=
256
// Amount of max blocks to be fetched per chunk
maxBlockFetch
=
256
// Amount of max blocks to be fetched per chunk
minDesiredPeerCount
=
3
// Amount of peers desired to start syncing
minDesiredPeerCount
=
3
// Amount of peers desired to start syncing
blockTtl
=
15
*
time
.
Second
// The amount of time it takes for a request to time out
)
)
var
(
var
(
...
@@ -96,7 +97,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
...
@@ -96,7 +97,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
// add peer to our peer set
// add peer to our peer set
d
.
peers
[
id
]
=
peer
d
.
peers
[
id
]
=
peer
// broadcast new peer
// broadcast new peer
//
d.newPeerCh <- peer
d
.
newPeerCh
<-
peer
return
nil
return
nil
}
}
...
@@ -265,6 +266,9 @@ out:
...
@@ -265,6 +266,9 @@ out:
// XXX Make synchronous
// XXX Make synchronous
func
(
d
*
Downloader
)
startFetchingHashes
(
p
*
peer
,
hash
common
.
Hash
,
ignoreInitial
bool
)
error
{
func
(
d
*
Downloader
)
startFetchingHashes
(
p
*
peer
,
hash
common
.
Hash
,
ignoreInitial
bool
)
error
{
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
1
)
defer
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
0
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Downloading hashes (%x) from %s"
,
hash
.
Bytes
()[
:
4
],
p
.
id
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Downloading hashes (%x) from %s"
,
hash
.
Bytes
()[
:
4
],
p
.
id
)
start
:=
time
.
Now
()
start
:=
time
.
Now
()
...
@@ -275,10 +279,8 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
...
@@ -275,10 +279,8 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
// Add the hash to the queue first
// Add the hash to the queue first
d
.
queue
.
hashPool
.
Add
(
hash
)
d
.
queue
.
hashPool
.
Add
(
hash
)
}
}
// Get the first batch of hashes
// Get the first batch of hashes
p
.
getHashes
(
hash
)
p
.
getHashes
(
hash
)
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
1
)
out
:
out
:
for
{
for
{
...
@@ -299,14 +301,16 @@ out:
...
@@ -299,14 +301,16 @@ out:
d
.
queue
.
put
(
hashSet
)
d
.
queue
.
put
(
hashSet
)
// Add hashes to the chunk set
// Add hashes to the chunk set
// Check if we're done fetching
if
len
(
hashes
)
==
0
{
// Make sure the peer actually gave you something valid
if
!
done
&&
len
(
hashes
)
>
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer (%s) responded with empty hash set
\n
"
,
p
.
id
)
d
.
queue
.
reset
()
break
out
}
else
if
!
done
{
// Check if we're done fetching
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
// Get the next set of hashes
// Get the next set of hashes
p
.
getHashes
(
hashes
[
len
(
hashes
)
-
1
])
p
.
getHashes
(
hashes
[
len
(
hashes
)
-
1
])
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
1
)
}
else
{
// we're done
}
else
{
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
0
)
break
out
break
out
}
}
}
}
...
@@ -319,6 +323,7 @@ out:
...
@@ -319,6 +323,7 @@ out:
func
(
d
*
Downloader
)
startFetchingBlocks
(
p
*
peer
)
error
{
func
(
d
*
Downloader
)
startFetchingBlocks
(
p
*
peer
)
error
{
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Downloading"
,
d
.
queue
.
hashPool
.
Size
(),
"blocks"
)
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Downloading"
,
d
.
queue
.
hashPool
.
Size
(),
"blocks"
)
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
1
)
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
1
)
defer
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
0
)
start
:=
time
.
Now
()
start
:=
time
.
Now
()
...
@@ -364,8 +369,6 @@ out:
...
@@ -364,8 +369,6 @@ out:
// When there are no more queue and no more `fetching`. We can
// When there are no more queue and no more `fetching`. We can
// safely assume we're done. Another part of the process will check
// safely assume we're done. Another part of the process will check
// for parent errors and will re-request anything that's missing
// for parent errors and will re-request anything that's missing
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
0
)
// Break out so that we can process with processing blocks
break
out
break
out
}
else
{
}
else
{
// Check for bad peers. Bad peers may indicate a peer not responding
// Check for bad peers. Bad peers may indicate a peer not responding
...
@@ -376,7 +379,7 @@ out:
...
@@ -376,7 +379,7 @@ out:
d
.
queue
.
mu
.
Lock
()
d
.
queue
.
mu
.
Lock
()
var
badPeers
[]
string
var
badPeers
[]
string
for
pid
,
chunk
:=
range
d
.
queue
.
fetching
{
for
pid
,
chunk
:=
range
d
.
queue
.
fetching
{
if
time
.
Since
(
chunk
.
itime
)
>
5
*
time
.
Second
{
if
time
.
Since
(
chunk
.
itime
)
>
blockTtl
{
badPeers
=
append
(
badPeers
,
pid
)
badPeers
=
append
(
badPeers
,
pid
)
// remove peer as good peer from peer list
// remove peer as good peer from peer list
d
.
UnregisterPeer
(
pid
)
d
.
UnregisterPeer
(
pid
)
...
@@ -466,8 +469,11 @@ func (d *Downloader) process() error {
...
@@ -466,8 +469,11 @@ func (d *Downloader) process() error {
// to a seperate goroutine where it periodically checks for linked pieces.
// to a seperate goroutine where it periodically checks for linked pieces.
types
.
BlockBy
(
types
.
Number
)
.
Sort
(
d
.
queue
.
blocks
)
types
.
BlockBy
(
types
.
Number
)
.
Sort
(
d
.
queue
.
blocks
)
blocks
:=
d
.
queue
.
blocks
blocks
:=
d
.
queue
.
blocks
if
len
(
blocks
)
==
0
{
return
nil
}
glog
.
V
(
logger
.
Debug
)
.
Info
ln
(
"Inserting chain with"
,
len
(
blocks
),
"blocks"
)
glog
.
V
(
logger
.
Debug
)
.
Info
f
(
"Inserting chain with %d blocks (#%v - #%v)
\n
"
,
len
(
blocks
),
blocks
[
0
]
.
Number
(),
blocks
[
len
(
blocks
)
-
1
]
.
Number
()
)
var
err
error
var
err
error
// Loop untill we're out of blocks
// Loop untill we're out of blocks
...
@@ -491,6 +497,11 @@ func (d *Downloader) process() error {
...
@@ -491,6 +497,11 @@ func (d *Downloader) process() error {
}
}
}
}
break
break
}
else
if
err
!=
nil
{
// Reset chain completely. This needs much, much improvement.
// instead: check all blocks leading down to this block false block and remove it
blocks
=
nil
break
}
}
blocks
=
blocks
[
max
:
]
blocks
=
blocks
[
max
:
]
}
}
...
...
eth/downloader/queue.go
View file @
8244825b
...
@@ -31,6 +31,17 @@ func newqueue() *queue {
...
@@ -31,6 +31,17 @@ func newqueue() *queue {
}
}
}
}
func
(
c
*
queue
)
reset
()
{
c
.
mu
.
Lock
()
defer
c
.
mu
.
Unlock
()
c
.
hashPool
.
Clear
()
c
.
fetchPool
.
Clear
()
c
.
blockHashes
.
Clear
()
c
.
blocks
=
nil
c
.
fetching
=
make
(
map
[
string
]
*
chunk
)
}
// reserve a `max` set of hashes for `p` peer.
// reserve a `max` set of hashes for `p` peer.
func
(
c
*
queue
)
get
(
p
*
peer
,
max
int
)
*
chunk
{
func
(
c
*
queue
)
get
(
p
*
peer
,
max
int
)
*
chunk
{
c
.
mu
.
Lock
()
c
.
mu
.
Lock
()
...
...
eth/handler.go
View file @
8244825b
...
@@ -95,7 +95,7 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman
...
@@ -95,7 +95,7 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman
Run
:
func
(
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
error
{
Run
:
func
(
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
error
{
peer
:=
manager
.
newPeer
(
protocolVersion
,
networkId
,
p
,
rw
)
peer
:=
manager
.
newPeer
(
protocolVersion
,
networkId
,
p
,
rw
)
err
:=
manager
.
handle
(
peer
)
err
:=
manager
.
handle
(
peer
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"[%s]: %v
\n
"
,
peer
.
id
,
err
)
//
glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
return
err
return
err
},
},
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment