Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
5ec6ecc5
Commit
5ec6ecc5
authored
Jun 17, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth, eth/fetcher: move propagated block import into fetcher
parent
a9ada0b5
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
106 additions
and
123 deletions
+106
-123
downloader.go
eth/downloader/downloader.go
+1
-1
fetcher.go
eth/fetcher/fetcher.go
+36
-19
fetcher_test.go
eth/fetcher/fetcher_test.go
+36
-26
handler.go
eth/handler.go
+33
-77
No files found.
eth/downloader/downloader.go
View file @
5ec6ecc5
...
...
@@ -99,7 +99,7 @@ type Downloader struct {
hasBlock
hashCheckFn
// Checks if a block is present in the chain
getBlock
blockRetrievalFn
// Retrieves a block from the chain
insertChain
chainInsertFn
// Injects a batch of blocks into the chain
dropPeer
peerDropFn
//
Retrieved the TD of our own chain
dropPeer
peerDropFn
//
Drops a peer for misbehaving
// Status
synchroniseMock
func
(
id
string
,
hash
common
.
Hash
)
error
// Replacement for synchronise during testing
...
...
eth/fetcher/fetcher.go
View file @
5ec6ecc5
...
...
@@ -29,12 +29,18 @@ type hashCheckFn func(common.Hash) bool
// blockRequesterFn is a callback type for sending a block retrieval request.
type
blockRequesterFn
func
([]
common
.
Hash
)
error
// block
ImporterFn is a callback type for trying to inject a block into the local chain
.
type
block
ImporterFn
func
(
peer
string
,
block
*
types
.
Block
)
error
// block
BroadcasterFn is a callback type for broadcasting a block to connected peers
.
type
block
BroadcasterFn
func
(
block
*
types
.
Block
)
// chainHeightFn is a callback type to retrieve the current chain height.
type
chainHeightFn
func
()
uint64
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type
chainInsertFn
func
(
types
.
Blocks
)
(
int
,
error
)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type
peerDropFn
func
(
id
string
)
// announce is the hash notification of the availability of a new block in the
// network.
type
announce
struct
{
...
...
@@ -70,26 +76,30 @@ type Fetcher struct {
queued
map
[
common
.
Hash
]
struct
{}
// Presence set of already queued blocks (to dedup imports)
// Callbacks
hasBlock
hashCheckFn
// Checks if a block is present in the chain
importBlock
blockImporterFn
// Injects a block from an origin peer into the chain
chainHeight
chainHeightFn
// Retrieves the current chain's height
hasBlock
hashCheckFn
// Checks if a block is present in the chain
broadcastBlock
blockBroadcasterFn
// Broadcasts a block to connected peers
chainHeight
chainHeightFn
// Retrieves the current chain's height
insertChain
chainInsertFn
// Injects a batch of blocks into the chain
dropPeer
peerDropFn
// Drops a peer for misbehaving
}
// New creates a block fetcher to retrieve blocks based on hash announcements.
func
New
(
hasBlock
hashCheckFn
,
importBlock
blockImporterFn
,
chainHeight
chainHeight
Fn
)
*
Fetcher
{
func
New
(
hasBlock
hashCheckFn
,
broadcastBlock
blockBroadcasterFn
,
chainHeight
chainHeightFn
,
insertChain
chainInsertFn
,
dropPeer
peerDrop
Fn
)
*
Fetcher
{
return
&
Fetcher
{
notify
:
make
(
chan
*
announce
),
inject
:
make
(
chan
*
inject
),
filter
:
make
(
chan
chan
[]
*
types
.
Block
),
done
:
make
(
chan
common
.
Hash
),
quit
:
make
(
chan
struct
{}),
announced
:
make
(
map
[
common
.
Hash
][]
*
announce
),
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
queue
:
prque
.
New
(),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
hasBlock
:
hasBlock
,
importBlock
:
importBlock
,
chainHeight
:
chainHeight
,
notify
:
make
(
chan
*
announce
),
inject
:
make
(
chan
*
inject
),
filter
:
make
(
chan
chan
[]
*
types
.
Block
),
done
:
make
(
chan
common
.
Hash
),
quit
:
make
(
chan
struct
{}),
announced
:
make
(
map
[
common
.
Hash
][]
*
announce
),
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
queue
:
prque
.
New
(),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
hasBlock
:
hasBlock
,
broadcastBlock
:
broadcastBlock
,
chainHeight
:
chainHeight
,
insertChain
:
insertChain
,
dropPeer
:
dropPeer
,
}
}
...
...
@@ -328,10 +338,17 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
go
func
()
{
defer
func
()
{
f
.
done
<-
hash
}()
// If the parent's unknown, abort insertion
if
!
f
.
hasBlock
(
block
.
ParentHash
())
{
return
}
// Run the actual import and log any issues
if
err
:=
f
.
importBlock
(
peer
,
block
);
err
!=
nil
{
if
_
,
err
:=
f
.
insertChain
(
types
.
Blocks
{
block
}
);
err
!=
nil
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: block #%d [%x] import failed: %v"
,
peer
,
block
.
NumberU64
(),
hash
[
:
4
],
err
)
f
.
dropPeer
(
peer
)
return
}
// If import succeeded, broadcast the block
go
f
.
broadcastBlock
(
block
)
}()
}
eth/fetcher/fetcher_test.go
View file @
5ec6ecc5
...
...
@@ -80,7 +80,7 @@ func newTester() *fetcherTester {
hashes
:
[]
common
.
Hash
{
knownHash
},
blocks
:
map
[
common
.
Hash
]
*
types
.
Block
{
knownHash
:
genesis
},
}
tester
.
fetcher
=
New
(
tester
.
hasBlock
,
tester
.
importBlock
,
tester
.
chainHeight
)
tester
.
fetcher
=
New
(
tester
.
hasBlock
,
tester
.
broadcastBlock
,
tester
.
chainHeight
,
tester
.
insertChain
,
tester
.
dropPeer
)
tester
.
fetcher
.
Start
()
return
tester
...
...
@@ -95,23 +95,8 @@ func (f *fetcherTester) hasBlock(hash common.Hash) bool {
return
ok
}
// importBlock injects a new blocks into the simulated chain.
func
(
f
*
fetcherTester
)
importBlock
(
peer
string
,
block
*
types
.
Block
)
error
{
f
.
lock
.
Lock
()
defer
f
.
lock
.
Unlock
()
// Make sure the parent in known
if
_
,
ok
:=
f
.
blocks
[
block
.
ParentHash
()];
!
ok
{
return
errors
.
New
(
"unknown parent"
)
}
// Discard any new blocks if the same height already exists
if
block
.
NumberU64
()
<=
f
.
blocks
[
f
.
hashes
[
len
(
f
.
hashes
)
-
1
]]
.
NumberU64
()
{
return
nil
}
// Otherwise build our current chain
f
.
hashes
=
append
(
f
.
hashes
,
block
.
Hash
())
f
.
blocks
[
block
.
Hash
()]
=
block
return
nil
// broadcastBlock is a nop placeholder for the block broadcasting.
func
(
f
*
fetcherTester
)
broadcastBlock
(
block
*
types
.
Block
)
{
}
// chainHeight retrieves the current height (block number) of the chain.
...
...
@@ -122,6 +107,31 @@ func (f *fetcherTester) chainHeight() uint64 {
return
f
.
blocks
[
f
.
hashes
[
len
(
f
.
hashes
)
-
1
]]
.
NumberU64
()
}
// insertChain injects a new blocks into the simulated chain.
func
(
f
*
fetcherTester
)
insertChain
(
blocks
types
.
Blocks
)
(
int
,
error
)
{
f
.
lock
.
Lock
()
defer
f
.
lock
.
Unlock
()
for
i
,
block
:=
range
blocks
{
// Make sure the parent in known
if
_
,
ok
:=
f
.
blocks
[
block
.
ParentHash
()];
!
ok
{
return
i
,
errors
.
New
(
"unknown parent"
)
}
// Discard any new blocks if the same height already exists
if
block
.
NumberU64
()
<=
f
.
blocks
[
f
.
hashes
[
len
(
f
.
hashes
)
-
1
]]
.
NumberU64
()
{
return
i
,
nil
}
// Otherwise build our current chain
f
.
hashes
=
append
(
f
.
hashes
,
block
.
Hash
())
f
.
blocks
[
block
.
Hash
()]
=
block
}
return
0
,
nil
}
// dropPeer is a nop placeholder for the peer removal.
func
(
f
*
fetcherTester
)
dropPeer
(
peer
string
)
{
}
// peerFetcher retrieves a fetcher associated with a simulated peer.
func
(
f
*
fetcherTester
)
makeFetcher
(
blocks
map
[
common
.
Hash
]
*
types
.
Block
)
blockRequesterFn
{
// Copy all the blocks to ensure they are not tampered with
...
...
@@ -330,9 +340,9 @@ func TestImportDeduplication(t *testing.T) {
fetcher
:=
tester
.
makeFetcher
(
blocks
)
counter
:=
uint32
(
0
)
tester
.
fetcher
.
i
mportBlock
=
func
(
peer
string
,
block
*
types
.
Block
)
error
{
atomic
.
AddUint32
(
&
counter
,
1
)
return
tester
.
i
mportBlock
(
peer
,
block
)
tester
.
fetcher
.
i
nsertChain
=
func
(
blocks
types
.
Blocks
)
(
int
,
error
)
{
atomic
.
AddUint32
(
&
counter
,
uint32
(
len
(
blocks
))
)
return
tester
.
i
nsertChain
(
blocks
)
}
// Announce the duplicating block, wait for retrieval, and also propagate directly
tester
.
fetcher
.
Notify
(
"valid"
,
hashes
[
0
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
fetcher
)
...
...
@@ -400,18 +410,18 @@ func TestCompetingImports(t *testing.T) {
first
:=
int32
(
1
)
height
:=
uint64
(
1
)
tester
.
fetcher
.
i
mportBlock
=
func
(
peer
string
,
block
*
types
.
Block
)
error
{
tester
.
fetcher
.
i
nsertChain
=
func
(
blocks
types
.
Blocks
)
(
int
,
error
)
{
// Check for any phase reordering
if
prev
:=
atomic
.
LoadUint64
(
&
height
);
block
.
NumberU64
()
<
prev
{
t
.
Errorf
(
"phase reversal: have %v, want %v"
,
block
.
NumberU64
(),
prev
)
if
prev
:=
atomic
.
LoadUint64
(
&
height
);
block
s
[
0
]
.
NumberU64
()
<
prev
{
t
.
Errorf
(
"phase reversal: have %v, want %v"
,
block
s
[
0
]
.
NumberU64
(),
prev
)
}
atomic
.
StoreUint64
(
&
height
,
block
.
NumberU64
())
atomic
.
StoreUint64
(
&
height
,
block
s
[
0
]
.
NumberU64
())
// Sleep a bit on the first import not to race with the enqueues
if
atomic
.
CompareAndSwapInt32
(
&
first
,
1
,
0
)
{
time
.
Sleep
(
50
*
time
.
Millisecond
)
}
return
tester
.
i
mportBlock
(
peer
,
block
)
return
tester
.
i
nsertChain
(
blocks
)
}
// Queue up everything but with a missing link
for
i
:=
0
;
i
<
len
(
hashesA
)
-
2
;
i
++
{
...
...
eth/handler.go
View file @
5ec6ecc5
package
eth
import
(
"errors"
"fmt"
"math"
"math/big"
"sync"
"time"
...
...
@@ -93,16 +91,10 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
// Construct the different synchronisation mechanisms
manager
.
downloader
=
downloader
.
New
(
manager
.
eventMux
,
manager
.
chainman
.
HasBlock
,
manager
.
chainman
.
GetBlock
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
importer
:=
func
(
peer
string
,
block
*
types
.
Block
)
error
{
if
p
:=
manager
.
peers
.
Peer
(
peer
);
p
!=
nil
{
return
manager
.
importBlock
(
manager
.
peers
.
Peer
(
peer
),
block
,
nil
)
}
return
errors
.
New
(
"unknown peer"
)
}
heighter
:=
func
()
uint64
{
return
manager
.
chainman
.
CurrentBlock
()
.
NumberU64
()
}
manager
.
fetcher
=
fetcher
.
New
(
manager
.
chainman
.
HasBlock
,
importer
,
height
er
)
manager
.
fetcher
=
fetcher
.
New
(
manager
.
chainman
.
HasBlock
,
manager
.
BroadcastBlock
,
heighter
,
manager
.
chainman
.
InsertChain
,
manager
.
removePe
er
)
return
manager
}
...
...
@@ -194,7 +186,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
return
nil
}
func
(
self
*
ProtocolManager
)
handleMsg
(
p
*
peer
)
error
{
func
(
pm
*
ProtocolManager
)
handleMsg
(
p
*
peer
)
error
{
msg
,
err
:=
p
.
rw
.
ReadMsg
()
if
err
!=
nil
{
return
err
...
...
@@ -224,7 +216,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
RemoteId
:
p
.
ID
()
.
String
(),
})
}
self
.
txpool
.
AddTransactions
(
txs
)
pm
.
txpool
.
AddTransactions
(
txs
)
case
GetBlockHashesMsg
:
var
request
getBlockHashesMsgData
...
...
@@ -236,7 +228,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
request
.
Amount
=
uint64
(
downloader
.
MaxHashFetch
)
}
hashes
:=
self
.
chainman
.
GetBlockHashesFromHash
(
request
.
Hash
,
request
.
Amount
)
hashes
:=
pm
.
chainman
.
GetBlockHashesFromHash
(
request
.
Hash
,
request
.
Amount
)
if
glog
.
V
(
logger
.
Debug
)
{
if
len
(
hashes
)
==
0
{
...
...
@@ -254,7 +246,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
if
err
:=
msgStream
.
Decode
(
&
hashes
);
err
!=
nil
{
break
}
err
:=
self
.
downloader
.
DeliverHashes
(
p
.
id
,
hashes
)
err
:=
pm
.
downloader
.
DeliverHashes
(
p
.
id
,
hashes
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
err
)
}
...
...
@@ -280,7 +272,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return
errResp
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
}
block
:=
self
.
chainman
.
GetBlock
(
hash
)
block
:=
pm
.
chainman
.
GetBlock
(
hash
)
if
block
!=
nil
{
blocks
=
append
(
blocks
,
block
)
totalsize
+=
block
.
Size
()
...
...
@@ -301,8 +293,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
blocks
=
nil
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if
blocks
:=
self
.
fetcher
.
Filter
(
blocks
);
len
(
blocks
)
>
0
{
self
.
downloader
.
DeliverBlocks
(
p
.
id
,
blocks
)
if
blocks
:=
pm
.
fetcher
.
Filter
(
blocks
);
len
(
blocks
)
>
0
{
pm
.
downloader
.
DeliverBlocks
(
p
.
id
,
blocks
)
}
case
NewBlockHashesMsg
:
...
...
@@ -321,15 +313,16 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
// Schedule all the unknown hashes for retrieval
unknown
:=
make
([]
common
.
Hash
,
0
,
len
(
hashes
))
for
_
,
hash
:=
range
hashes
{
if
!
self
.
chainman
.
HasBlock
(
hash
)
{
if
!
pm
.
chainman
.
HasBlock
(
hash
)
{
unknown
=
append
(
unknown
,
hash
)
}
}
for
_
,
hash
:=
range
unknown
{
self
.
fetcher
.
Notify
(
p
.
id
,
hash
,
time
.
Now
(),
p
.
requestBlocks
)
pm
.
fetcher
.
Notify
(
p
.
id
,
hash
,
time
.
Now
(),
p
.
requestBlocks
)
}
case
NewBlockMsg
:
// Retrieve and decode the propagated block
var
request
newBlockMsgData
if
err
:=
msg
.
Decode
(
&
request
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"%v: %v"
,
msg
,
err
)
...
...
@@ -339,11 +332,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
request
.
Block
.
ReceivedAt
=
msg
.
ReceivedAt
// Try to import the propagated block, also making it fill any fetcher gaps
self
.
fetcher
.
Enqueue
(
p
.
id
,
request
.
Block
)
if
err
:=
self
.
importBlock
(
p
,
request
.
Block
,
request
.
TD
);
err
!=
nil
{
return
err
}
// Mark the block's arrival for whatever reason
_
,
chainHead
,
_
:=
pm
.
chainman
.
Status
()
jsonlogger
.
LogJson
(
&
logger
.
EthChainReceivedNewBlock
{
BlockHash
:
request
.
Block
.
Hash
()
.
Hex
(),
BlockNumber
:
request
.
Block
.
Number
(),
ChainHeadHash
:
chainHead
.
Hex
(),
BlockPrevHash
:
request
.
Block
.
ParentHash
()
.
Hex
(),
RemoteId
:
p
.
ID
()
.
String
(),
})
// Mark the peer as owning the block and schedule it for import
p
.
blockHashes
.
Add
(
request
.
Block
.
Hash
())
p
.
SetHead
(
request
.
Block
.
Hash
())
pm
.
fetcher
.
Enqueue
(
p
.
id
,
request
.
Block
)
// TODO: Schedule a sync to cover potential gaps (this needs proto update)
p
.
SetTd
(
request
.
TD
)
go
pm
.
synchronise
(
p
)
default
:
return
errResp
(
ErrInvalidMsgCode
,
"%v"
,
msg
.
Code
)
...
...
@@ -351,61 +357,11 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return
nil
}
// importBlocks injects a new block retrieved from the given peer into the chain
// manager.
func
(
pm
*
ProtocolManager
)
importBlock
(
p
*
peer
,
block
*
types
.
Block
,
td
*
big
.
Int
)
error
{
hash
:=
block
.
Hash
()
// Mark the block as present at the remote node (don't duplicate already held data)
p
.
blockHashes
.
Add
(
hash
)
p
.
SetHead
(
hash
)
if
td
!=
nil
{
p
.
SetTd
(
td
)
}
// Log the block's arrival
_
,
chainHead
,
_
:=
pm
.
chainman
.
Status
()
jsonlogger
.
LogJson
(
&
logger
.
EthChainReceivedNewBlock
{
BlockHash
:
hash
.
Hex
(),
BlockNumber
:
block
.
Number
(),
ChainHeadHash
:
chainHead
.
Hex
(),
BlockPrevHash
:
block
.
ParentHash
()
.
Hex
(),
RemoteId
:
p
.
ID
()
.
String
(),
})
// If the block's already known or its difficulty is lower than ours, drop
if
pm
.
chainman
.
HasBlock
(
hash
)
{
p
.
SetTd
(
pm
.
chainman
.
GetBlock
(
hash
)
.
Td
)
// update the peer's TD to the real value
return
nil
}
if
td
!=
nil
&&
pm
.
chainman
.
Td
()
.
Cmp
(
td
)
>
0
&&
new
(
big
.
Int
)
.
Add
(
block
.
Number
(),
big
.
NewInt
(
7
))
.
Cmp
(
pm
.
chainman
.
CurrentBlock
()
.
Number
())
<
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"[%s] dropped block %v due to low TD %v
\n
"
,
p
.
id
,
block
.
Number
(),
td
)
return
nil
}
// Attempt to insert the newly received block and propagate to our peers
if
pm
.
chainman
.
HasBlock
(
block
.
ParentHash
())
{
if
_
,
err
:=
pm
.
chainman
.
InsertChain
(
types
.
Blocks
{
block
});
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infoln
(
"removed peer ("
,
p
.
id
,
") due to block error"
,
err
)
return
err
}
if
td
!=
nil
&&
block
.
Td
.
Cmp
(
td
)
!=
0
{
err
:=
fmt
.
Errorf
(
"invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v"
,
block
.
Number
(),
p
.
id
,
block
.
Td
,
td
)
glog
.
V
(
logger
.
Error
)
.
Infoln
(
err
)
return
err
}
pm
.
BroadcastBlock
(
hash
,
block
)
return
nil
}
// Parent of the block is unknown, try to sync with this peer if it seems to be good
if
td
!=
nil
{
go
pm
.
synchronise
(
p
)
}
return
nil
}
// BroadcastBlock will propagate the block to a subset of its connected peers,
// only notifying the rest of the block's appearance.
func
(
pm
*
ProtocolManager
)
BroadcastBlock
(
hash
common
.
Hash
,
block
*
types
.
Block
)
{
func
(
pm
*
ProtocolManager
)
BroadcastBlock
(
block
*
types
.
Block
)
{
// Retrieve all the target peers and split between full broadcast or only notification
peers
:=
pm
.
peers
.
PeersWithoutBlock
(
hash
)
peers
:=
pm
.
peers
.
PeersWithoutBlock
(
block
.
Hash
()
)
split
:=
int
(
math
.
Sqrt
(
float64
(
len
(
peers
))))
transfer
:=
peers
[
:
split
]
...
...
@@ -413,7 +369,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
// Send out the data transfers and the notifications
for
_
,
peer
:=
range
notify
{
peer
.
sendNewBlockHashes
([]
common
.
Hash
{
hash
})
peer
.
sendNewBlockHashes
([]
common
.
Hash
{
block
.
Hash
()
})
}
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"broadcast hash to"
,
len
(
notify
),
"peers."
)
...
...
@@ -442,7 +398,7 @@ func (self *ProtocolManager) minedBroadcastLoop() {
for
obj
:=
range
self
.
minedBlockSub
.
Chan
()
{
switch
ev
:=
obj
.
(
type
)
{
case
core
.
NewMinedBlockEvent
:
self
.
BroadcastBlock
(
ev
.
Block
.
Hash
(),
ev
.
Block
)
self
.
BroadcastBlock
(
ev
.
Block
)
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment