Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
acf8452c
Commit
acf8452c
authored
Apr 12, 2015
by
obscuren
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
downloader: implemented new downloader
parent
61db7a71
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
602 additions
and
0 deletions
+602
-0
chunk.go
eth/downloader/chunk.go
+98
-0
downloader.go
eth/downloader/downloader.go
+328
-0
downloader_test.go
eth/downloader/downloader_test.go
+128
-0
peer.go
eth/downloader/peer.go
+48
-0
No files found.
eth/downloader/chunk.go
0 → 100644
View file @
acf8452c
package
downloader
import
(
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"gopkg.in/fatih/set.v0"
)
// queue represents hashes that are either need fetching or are being fetched
type
queue
struct
{
hashPool
*
set
.
Set
mu
sync
.
Mutex
fetching
map
[
string
]
*
chunk
blocks
[]
*
types
.
Block
}
func
newqueue
()
*
queue
{
return
&
queue
{
hashPool
:
set
.
New
(),
fetching
:
make
(
map
[
string
]
*
chunk
),
}
}
// reserve a `max` set of hashes for `p` peer.
func
(
c
*
queue
)
get
(
p
*
peer
,
max
int
)
*
chunk
{
c
.
mu
.
Lock
()
defer
c
.
mu
.
Unlock
()
// return nothing if the pool has been depleted
if
c
.
hashPool
.
Size
()
==
0
{
return
nil
}
limit
:=
int
(
math
.
Min
(
float64
(
max
),
float64
(
c
.
hashPool
.
Size
())))
// Create a new set of hashes
hashes
,
i
:=
set
.
New
(),
0
c
.
hashPool
.
Each
(
func
(
v
interface
{})
bool
{
if
i
==
limit
{
return
false
}
hashes
.
Add
(
v
)
i
++
return
true
})
// remove the fetchable hashes from hash pool
c
.
hashPool
.
Separate
(
hashes
)
// Create a new chunk for the seperated hashes. The time is being used
// to reset the chunk (timeout)
chunk
:=
&
chunk
{
hashes
,
time
.
Now
()}
// register as 'fetching' state
c
.
fetching
[
p
.
id
]
=
chunk
// create new chunk for peer
return
chunk
}
func
(
c
*
queue
)
deliver
(
id
string
,
blocks
[]
*
types
.
Block
)
{
c
.
mu
.
Lock
()
defer
c
.
mu
.
Unlock
()
chunk
:=
c
.
fetching
[
id
]
// If the chunk was never requested simply ignore it
if
chunk
!=
nil
{
delete
(
c
.
fetching
,
id
)
// seperate the blocks and the hashes
chunk
.
seperate
(
blocks
)
// Add the blocks
c
.
blocks
=
append
(
c
.
blocks
,
blocks
...
)
// Add back whatever couldn't be delivered
c
.
hashPool
.
Merge
(
chunk
.
hashes
)
}
}
func
(
c
*
queue
)
put
(
hashes
*
set
.
Set
)
{
c
.
mu
.
Lock
()
defer
c
.
mu
.
Unlock
()
c
.
hashPool
.
Merge
(
hashes
)
}
type
chunk
struct
{
hashes
*
set
.
Set
itime
time
.
Time
}
func
(
ch
*
chunk
)
seperate
(
blocks
[]
*
types
.
Block
)
{
for
_
,
block
:=
range
blocks
{
ch
.
hashes
.
Remove
(
block
.
Hash
())
}
}
eth/downloader/downloader.go
0 → 100644
View file @
acf8452c
package
downloader
import
(
"math"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"gopkg.in/fatih/set.v0"
)
const
maxBlockFetch
=
256
type
hashFetcherFn
func
(
common
.
Hash
)
error
type
blockFetcherFn
func
([]
common
.
Hash
)
error
type
hashCheckFn
func
(
common
.
Hash
)
bool
type
chainInsertFn
func
(
types
.
Blocks
)
error
type
hashIterFn
func
()
(
common
.
Hash
,
error
)
// XXX make threadsafe!!!!
type
peers
map
[
string
]
*
peer
func
(
p
peers
)
get
(
state
int
)
[]
*
peer
{
var
peers
[]
*
peer
for
_
,
peer
:=
range
p
{
peer
.
mu
.
RLock
()
if
peer
.
state
==
state
{
peers
=
append
(
peers
,
peer
)
}
peer
.
mu
.
RUnlock
()
}
return
peers
}
func
(
p
peers
)
setState
(
id
string
,
state
int
)
{
if
peer
,
exist
:=
p
[
id
];
exist
{
peer
.
mu
.
Lock
()
defer
peer
.
mu
.
Unlock
()
peer
.
state
=
state
}
}
type
Downloader
struct
{
queue
*
queue
hasBlock
hashCheckFn
insertChain
chainInsertFn
mu
sync
.
RWMutex
peers
peers
currentPeer
*
peer
fetchingHashes
int32
downloadingBlocks
int32
newPeerCh
chan
*
peer
selectPeerCh
chan
*
peer
HashCh
chan
[]
common
.
Hash
blockCh
chan
blockPack
quit
chan
struct
{}
}
type
blockPack
struct
{
peerId
string
blocks
[]
*
types
.
Block
}
func
New
(
hasBlock
hashCheckFn
,
insertChain
chainInsertFn
)
*
Downloader
{
downloader
:=
&
Downloader
{
queue
:
newqueue
(),
peers
:
make
(
peers
),
hasBlock
:
hasBlock
,
insertChain
:
insertChain
,
newPeerCh
:
make
(
chan
*
peer
,
1
),
selectPeerCh
:
make
(
chan
*
peer
,
1
),
HashCh
:
make
(
chan
[]
common
.
Hash
,
1
),
blockCh
:
make
(
chan
blockPack
,
1
),
quit
:
make
(
chan
struct
{}),
}
go
downloader
.
peerHandler
()
go
downloader
.
update
()
return
downloader
}
func
(
d
*
Downloader
)
RegisterPeer
(
id
string
,
td
*
big
.
Int
,
hash
common
.
Hash
,
getHashes
hashFetcherFn
,
getBlocks
blockFetcherFn
)
error
{
d
.
mu
.
Lock
()
defer
d
.
mu
.
Unlock
()
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Register peer"
,
id
)
// Create a new peer and add it to the list of known peers
peer
:=
newPeer
(
id
,
td
,
hash
,
getHashes
,
getBlocks
)
// add peer to our peer set
d
.
peers
[
id
]
=
peer
// broadcast new peer
d
.
newPeerCh
<-
peer
return
nil
}
func
(
d
*
Downloader
)
UnregisterPeer
(
id
string
)
{
d
.
mu
.
Lock
()
defer
d
.
mu
.
Unlock
()
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Unregister peer"
,
id
)
delete
(
d
.
peers
,
id
)
}
func
(
d
*
Downloader
)
peerHandler
()
{
// Fields defined here so we can reduce the amount of locking
// that needs to be done
var
highestTd
=
new
(
big
.
Int
)
out
:
for
{
select
{
case
newPeer
:=
<-
d
.
newPeerCh
:
// Check if TD of peer is higher than our current
if
newPeer
.
td
.
Cmp
(
highestTd
)
>
0
{
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"New peer with highest TD ="
,
newPeer
.
td
)
highestTd
.
Set
(
newPeer
.
td
)
// select the peer for downloading
d
.
selectPeerCh
<-
newPeer
}
case
<-
d
.
quit
:
break
out
}
}
}
func
(
d
*
Downloader
)
update
()
{
out
:
for
{
select
{
case
selectedPeer
:=
<-
d
.
selectPeerCh
:
// Make sure it's doing neither. Once done we can restart the
// downloading process if the TD is higher. For now just get on
// with whatever is going on. This prevents unecessary switching.
if
!
(
d
.
isFetchingHashes
()
||
d
.
isDownloadingBlocks
())
{
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Selected new peer"
,
selectedPeer
.
id
)
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// respectively.
if
err
:=
d
.
startFetchingHashes
(
selectedPeer
);
err
!=
nil
{
// handle error
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Error fetching hashes:"
,
err
)
// Reset
break
}
// Start fetching blocks in paralel. The strategy is simple
// take any available peers, seserve a chunk for each peer available,
// let the peer deliver the chunkn and periodically check if a peer
// has timedout. When done downloading, process blocks.
if
err
:=
d
.
startFetchingBlocks
(
selectedPeer
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Error downloading blocks:"
,
err
)
// reset
break
}
// XXX this will move when optimised
// Sort the blocks by number. This bit needs much improvement. Right now
// it assumes full honesty form peers (i.e. it's not checked when the blocks
// link). We should at least check whihc queue match. This code could move
// to a seperate goroutine where it periodically checks for linked pieces.
types
.
BlockBy
(
types
.
Number
)
.
Sort
(
d
.
queue
.
blocks
)
blocks
:=
d
.
queue
.
blocks
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Inserting chain with"
,
len
(
blocks
),
"blocks"
)
// Loop untill we're out of queue
for
len
(
blocks
)
!=
0
{
max
:=
int
(
math
.
Min
(
float64
(
len
(
blocks
)),
256
))
// TODO check for parent error. When there's a parent error we should stop
// processing and start requesting the `block.hash` so that it's parent and
// grandparents can be requested and queued.
d
.
insertChain
(
blocks
[
:
max
])
blocks
=
blocks
[
max
:
]
}
}
case
<-
d
.
quit
:
break
out
}
}
}
func
(
d
*
Downloader
)
startFetchingHashes
(
p
*
peer
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Downloading hashes"
)
start
:=
time
.
Now
()
// Get the first batch of hashes
p
.
getHashes
(
p
.
recentHash
)
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
1
)
out
:
for
{
select
{
case
hashes
:=
<-
d
.
HashCh
:
var
done
bool
// determines whether we're done fetching hashes (i.e. common hash found)
hashSet
:=
set
.
New
()
for
_
,
hash
:=
range
hashes
{
if
d
.
hasBlock
(
hash
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Found common hash %x
\n
"
,
hash
)
done
=
true
break
}
hashSet
.
Add
(
hash
)
}
d
.
queue
.
put
(
hashSet
)
// Add hashes to the chunk set
// Check if we're done fetching
if
!
done
{
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
// Get the next set of hashes
p
.
getHashes
(
hashes
[
len
(
hashes
)
-
1
])
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
1
)
}
else
{
atomic
.
StoreInt32
(
&
d
.
fetchingHashes
,
0
)
break
out
}
}
}
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Download hashes: done. Took"
,
time
.
Since
(
start
))
return
nil
}
func
(
d
*
Downloader
)
DeliverBlocks
(
id
string
,
block
[]
*
types
.
Block
)
{
d
.
blockCh
<-
blockPack
{
id
,
block
}
}
func
(
d
*
Downloader
)
startFetchingBlocks
(
p
*
peer
)
error
{
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Downloading"
,
d
.
queue
.
hashPool
.
Size
(),
"blocks"
)
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
1
)
start
:=
time
.
Now
()
// default ticker for re-fetching blocks everynow and then
ticker
:=
time
.
NewTicker
(
20
*
time
.
Millisecond
)
out
:
for
{
select
{
case
blockPack
:=
<-
d
.
blockCh
:
//fmt.Println("get for", blockPack.peerId)
d
.
queue
.
deliver
(
blockPack
.
peerId
,
blockPack
.
blocks
)
d
.
peers
.
setState
(
blockPack
.
peerId
,
idleState
)
case
<-
ticker
.
C
:
// If there are unrequested hashes left start fetching
// from the available peers.
if
d
.
queue
.
hashPool
.
Size
()
>
0
{
availablePeers
:=
d
.
peers
.
get
(
idleState
)
for
_
,
peer
:=
range
availablePeers
{
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
chunk
:=
d
.
queue
.
get
(
peer
,
maxBlockFetch
)
if
chunk
!=
nil
{
//fmt.Println("fetching for", peer.id)
// Fetch the chunk.
peer
.
fetch
(
chunk
)
}
}
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
1
)
}
else
if
len
(
d
.
queue
.
fetching
)
==
0
{
// Whene there are no more queue and no more `fetching`. We can
// safely assume we're done. Another part of the process will check
// for parent errors and will re-request anything that's missing
atomic
.
StoreInt32
(
&
d
.
downloadingBlocks
,
0
)
// Break out so that we can process with processing blocks
break
out
}
else
{
// Check for bad peers. Bad peers may indicate a peer not responding
// to a `getBlocks` message. A timeout of 5 seconds is set. Peers
// that badly or poorly behave are removed from the peer set (not banned).
// Bad peers are excluded from the available peer set and therefor won't be
// reused. XXX We could re-introduce peers after X time.
d
.
queue
.
mu
.
Lock
()
var
badPeers
[]
string
for
pid
,
chunk
:=
range
d
.
queue
.
fetching
{
if
time
.
Since
(
chunk
.
itime
)
>
5
*
time
.
Second
{
badPeers
=
append
(
badPeers
,
pid
)
// remove peer as good peer from peer list
d
.
UnregisterPeer
(
pid
)
}
}
d
.
queue
.
mu
.
Unlock
()
for
_
,
pid
:=
range
badPeers
{
// A nil chunk is delivered so that the chunk's hashes are given
// back to the queue objects. When hashes are put back in the queue
// other (decent) peers can pick them up.
// XXX We could make use of a reputation system here ranking peers
// in their performance
// 1) Time for them to respond;
// 2) Measure their speed;
// 3) Amount and availability.
d
.
queue
.
deliver
(
pid
,
nil
)
}
}
//fmt.Println(d.queue.hashPool.Size(), len(d.queue.fetching))
}
}
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Download blocks: done. Took"
,
time
.
Since
(
start
))
return
nil
}
func
(
d
*
Downloader
)
isFetchingHashes
()
bool
{
return
atomic
.
LoadInt32
(
&
d
.
fetchingHashes
)
==
1
}
func
(
d
*
Downloader
)
isDownloadingBlocks
()
bool
{
return
atomic
.
LoadInt32
(
&
d
.
downloadingBlocks
)
==
1
}
eth/downloader/downloader_test.go
0 → 100644
View file @
acf8452c
package
downloader
import
(
"encoding/binary"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
var
knownHash
=
common
.
Hash
{
1
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
}
func
createHashes
(
amount
int
)
(
hashes
[]
common
.
Hash
)
{
hashes
=
make
([]
common
.
Hash
,
amount
+
1
)
hashes
[
len
(
hashes
)
-
1
]
=
knownHash
for
i
:=
range
hashes
[
:
len
(
hashes
)
-
1
]
{
binary
.
BigEndian
.
PutUint64
(
hashes
[
i
][
:
8
],
uint64
(
i
+
2
))
}
return
}
func
createBlocksFromHashes
(
hashes
[]
common
.
Hash
)
map
[
common
.
Hash
]
*
types
.
Block
{
blocks
:=
make
(
map
[
common
.
Hash
]
*
types
.
Block
)
for
i
,
hash
:=
range
hashes
{
header
:=
&
types
.
Header
{
Number
:
big
.
NewInt
(
int64
(
i
))}
blocks
[
hash
]
=
types
.
NewBlockWithHeader
(
header
)
blocks
[
hash
]
.
HeaderHash
=
hash
}
return
blocks
}
type
downloadTester
struct
{
downloader
*
Downloader
hashes
[]
common
.
Hash
blocks
map
[
common
.
Hash
]
*
types
.
Block
t
*
testing
.
T
pcount
int
done
chan
bool
insertedBlocks
int
}
func
newTester
(
t
*
testing
.
T
,
hashes
[]
common
.
Hash
,
blocks
map
[
common
.
Hash
]
*
types
.
Block
)
*
downloadTester
{
tester
:=
&
downloadTester
{
t
:
t
,
hashes
:
hashes
,
blocks
:
blocks
,
done
:
make
(
chan
bool
)}
downloader
:=
New
(
tester
.
hasBlock
,
tester
.
insertChain
)
tester
.
downloader
=
downloader
return
tester
}
func
(
dl
*
downloadTester
)
hasBlock
(
hash
common
.
Hash
)
bool
{
if
knownHash
==
hash
{
return
true
}
return
false
}
func
(
dl
*
downloadTester
)
insertChain
(
blocks
types
.
Blocks
)
error
{
dl
.
insertedBlocks
+=
len
(
blocks
)
if
len
(
dl
.
blocks
)
-
1
<=
dl
.
insertedBlocks
{
dl
.
done
<-
true
}
return
nil
}
func
(
dl
*
downloadTester
)
getHashes
(
hash
common
.
Hash
)
error
{
dl
.
downloader
.
HashCh
<-
dl
.
hashes
return
nil
}
func
(
dl
*
downloadTester
)
getBlocks
(
id
string
)
func
([]
common
.
Hash
)
error
{
return
func
(
hashes
[]
common
.
Hash
)
error
{
blocks
:=
make
([]
*
types
.
Block
,
len
(
hashes
))
for
i
,
hash
:=
range
hashes
{
blocks
[
i
]
=
dl
.
blocks
[
hash
]
}
go
dl
.
downloader
.
DeliverBlocks
(
id
,
blocks
)
return
nil
}
}
func
(
dl
*
downloadTester
)
newPeer
(
id
string
,
td
*
big
.
Int
,
hash
common
.
Hash
)
{
dl
.
pcount
++
dl
.
downloader
.
RegisterPeer
(
id
,
td
,
hash
,
dl
.
getHashes
,
dl
.
getBlocks
(
id
))
}
func
(
dl
*
downloadTester
)
badBlocksPeer
(
id
string
,
td
*
big
.
Int
,
hash
common
.
Hash
)
{
dl
.
pcount
++
// This bad peer never returns any blocks
dl
.
downloader
.
RegisterPeer
(
id
,
td
,
hash
,
dl
.
getHashes
,
func
([]
common
.
Hash
)
error
{
return
nil
})
}
func
TestDownload
(
t
*
testing
.
T
)
{
glog
.
SetV
(
logger
.
Detail
)
glog
.
SetToStderr
(
true
)
hashes
:=
createHashes
(
1000
)
blocks
:=
createBlocksFromHashes
(
hashes
)
tester
:=
newTester
(
t
,
hashes
,
blocks
)
tester
.
newPeer
(
"peer1"
,
big
.
NewInt
(
10000
),
hashes
[
len
(
hashes
)
-
1
])
tester
.
newPeer
(
"peer2"
,
big
.
NewInt
(
0
),
common
.
Hash
{})
tester
.
badBlocksPeer
(
"peer3"
,
big
.
NewInt
(
0
),
common
.
Hash
{})
tester
.
badBlocksPeer
(
"peer4"
,
big
.
NewInt
(
0
),
common
.
Hash
{})
success
:
select
{
case
<-
tester
.
done
:
break
success
case
<-
time
.
After
(
10
*
time
.
Second
)
:
// XXX this could actually fail on a slow computer
t
.
Error
(
"timout"
)
}
}
eth/downloader/peer.go
0 → 100644
View file @
acf8452c
package
downloader
import
(
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
)
const
(
workingState
=
2
idleState
=
4
)
// peer represents an active peer
type
peer
struct
{
state
int
mu
sync
.
RWMutex
id
string
td
*
big
.
Int
recentHash
common
.
Hash
getHashes
hashFetcherFn
getBlocks
blockFetcherFn
}
// create a new peer
func
newPeer
(
id
string
,
td
*
big
.
Int
,
hash
common
.
Hash
,
getHashes
hashFetcherFn
,
getBlocks
blockFetcherFn
)
*
peer
{
return
&
peer
{
id
:
id
,
td
:
td
,
recentHash
:
hash
,
getHashes
:
getHashes
,
getBlocks
:
getBlocks
,
state
:
idleState
}
}
// fetch a chunk using the peer
func
(
p
*
peer
)
fetch
(
chunk
*
chunk
)
{
p
.
mu
.
Lock
()
defer
p
.
mu
.
Unlock
()
// set working state
p
.
state
=
workingState
// convert the set to a fetchable slice
hashes
,
i
:=
make
([]
common
.
Hash
,
chunk
.
hashes
.
Size
()),
0
chunk
.
hashes
.
Each
(
func
(
v
interface
{})
bool
{
hashes
[
i
]
=
v
.
(
common
.
Hash
)
i
++
return
true
})
p
.
getBlocks
(
hashes
)
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment