Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
b91b581b
Commit
b91b581b
authored
Jun 18, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth, eth/fetcher: propagate after header verify, announce only on insert
parent
629705ad
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
81 additions
and
43 deletions
+81
-43
backend.go
eth/backend.go
+1
-1
fetcher.go
eth/fetcher/fetcher.go
+28
-15
fetcher_test.go
eth/fetcher/fetcher_test.go
+11
-7
handler.go
eth/handler.go
+41
-20
No files found.
eth/backend.go
View file @
b91b581b
...
@@ -313,7 +313,7 @@ func New(config *Config) (*Ethereum, error) {
...
@@ -313,7 +313,7 @@ func New(config *Config) (*Ethereum, error) {
eth
.
blockProcessor
=
core
.
NewBlockProcessor
(
stateDb
,
extraDb
,
eth
.
pow
,
eth
.
chainManager
,
eth
.
EventMux
())
eth
.
blockProcessor
=
core
.
NewBlockProcessor
(
stateDb
,
extraDb
,
eth
.
pow
,
eth
.
chainManager
,
eth
.
EventMux
())
eth
.
chainManager
.
SetProcessor
(
eth
.
blockProcessor
)
eth
.
chainManager
.
SetProcessor
(
eth
.
blockProcessor
)
eth
.
protocolManager
=
NewProtocolManager
(
config
.
ProtocolVersion
,
config
.
NetworkId
,
eth
.
eventMux
,
eth
.
txPool
,
eth
.
chainManager
)
eth
.
protocolManager
=
NewProtocolManager
(
config
.
ProtocolVersion
,
config
.
NetworkId
,
eth
.
eventMux
,
eth
.
txPool
,
eth
.
pow
,
eth
.
chainManager
)
eth
.
miner
=
miner
.
New
(
eth
,
eth
.
EventMux
(),
eth
.
pow
)
eth
.
miner
=
miner
.
New
(
eth
,
eth
.
EventMux
(),
eth
.
pow
)
eth
.
miner
.
SetGasPrice
(
config
.
GasPrice
)
eth
.
miner
.
SetGasPrice
(
config
.
GasPrice
)
...
...
eth/fetcher/fetcher.go
View file @
b91b581b
...
@@ -23,14 +23,17 @@ var (
...
@@ -23,14 +23,17 @@ var (
errTerminated
=
errors
.
New
(
"terminated"
)
errTerminated
=
errors
.
New
(
"terminated"
)
)
)
//
hashCheckFn is a callback type for verifying a hash's presence in
the local chain.
//
blockRetrievalFn is a callback type for retrieving a block from
the local chain.
type
hashCheckFn
func
(
common
.
Hash
)
bool
type
blockRetrievalFn
func
(
common
.
Hash
)
*
types
.
Block
// blockRequesterFn is a callback type for sending a block retrieval request.
// blockRequesterFn is a callback type for sending a block retrieval request.
type
blockRequesterFn
func
([]
common
.
Hash
)
error
type
blockRequesterFn
func
([]
common
.
Hash
)
error
// blockValidatorFn is a callback type to verify a block's header for fast propagation.
type
blockValidatorFn
func
(
block
*
types
.
Block
,
parent
*
types
.
Block
)
error
// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
type
blockBroadcasterFn
func
(
block
*
types
.
Block
)
type
blockBroadcasterFn
func
(
block
*
types
.
Block
,
propagate
bool
)
// chainHeightFn is a callback type to retrieve the current chain height.
// chainHeightFn is a callback type to retrieve the current chain height.
type
chainHeightFn
func
()
uint64
type
chainHeightFn
func
()
uint64
...
@@ -76,7 +79,8 @@ type Fetcher struct {
...
@@ -76,7 +79,8 @@ type Fetcher struct {
queued
map
[
common
.
Hash
]
struct
{}
// Presence set of already queued blocks (to dedup imports)
queued
map
[
common
.
Hash
]
struct
{}
// Presence set of already queued blocks (to dedup imports)
// Callbacks
// Callbacks
hasBlock
hashCheckFn
// Checks if a block is present in the chain
getBlock
blockRetrievalFn
// Retrieves a block from the local chain
validateBlock
blockValidatorFn
// Checks if a block's headers have a valid proof of work
broadcastBlock
blockBroadcasterFn
// Broadcasts a block to connected peers
broadcastBlock
blockBroadcasterFn
// Broadcasts a block to connected peers
chainHeight
chainHeightFn
// Retrieves the current chain's height
chainHeight
chainHeightFn
// Retrieves the current chain's height
insertChain
chainInsertFn
// Injects a batch of blocks into the chain
insertChain
chainInsertFn
// Injects a batch of blocks into the chain
...
@@ -84,7 +88,7 @@ type Fetcher struct {
...
@@ -84,7 +88,7 @@ type Fetcher struct {
}
}
// New creates a block fetcher to retrieve blocks based on hash announcements.
// New creates a block fetcher to retrieve blocks based on hash announcements.
func
New
(
hasBlock
hashCheck
Fn
,
broadcastBlock
blockBroadcasterFn
,
chainHeight
chainHeightFn
,
insertChain
chainInsertFn
,
dropPeer
peerDropFn
)
*
Fetcher
{
func
New
(
getBlock
blockRetrievalFn
,
validateBlock
blockValidator
Fn
,
broadcastBlock
blockBroadcasterFn
,
chainHeight
chainHeightFn
,
insertChain
chainInsertFn
,
dropPeer
peerDropFn
)
*
Fetcher
{
return
&
Fetcher
{
return
&
Fetcher
{
notify
:
make
(
chan
*
announce
),
notify
:
make
(
chan
*
announce
),
inject
:
make
(
chan
*
inject
),
inject
:
make
(
chan
*
inject
),
...
@@ -95,7 +99,8 @@ func New(hasBlock hashCheckFn, broadcastBlock blockBroadcasterFn, chainHeight ch
...
@@ -95,7 +99,8 @@ func New(hasBlock hashCheckFn, broadcastBlock blockBroadcasterFn, chainHeight ch
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
fetching
:
make
(
map
[
common
.
Hash
]
*
announce
),
queue
:
prque
.
New
(),
queue
:
prque
.
New
(),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
queued
:
make
(
map
[
common
.
Hash
]
struct
{}),
hasBlock
:
hasBlock
,
getBlock
:
getBlock
,
validateBlock
:
validateBlock
,
broadcastBlock
:
broadcastBlock
,
broadcastBlock
:
broadcastBlock
,
chainHeight
:
chainHeight
,
chainHeight
:
chainHeight
,
insertChain
:
insertChain
,
insertChain
:
insertChain
,
...
@@ -197,7 +202,7 @@ func (f *Fetcher) loop() {
...
@@ -197,7 +202,7 @@ func (f *Fetcher) loop() {
break
break
}
}
// Otherwise if fresh and still unknown, try and import
// Otherwise if fresh and still unknown, try and import
if
number
<=
height
||
f
.
hasBlock
(
op
.
block
.
Hash
())
{
if
number
<=
height
||
f
.
getBlock
(
op
.
block
.
Hash
())
!=
nil
{
continue
continue
}
}
f
.
insert
(
op
.
origin
,
op
.
block
)
f
.
insert
(
op
.
origin
,
op
.
block
)
...
@@ -235,7 +240,7 @@ func (f *Fetcher) loop() {
...
@@ -235,7 +240,7 @@ func (f *Fetcher) loop() {
for
hash
,
announces
:=
range
f
.
announced
{
for
hash
,
announces
:=
range
f
.
announced
{
if
time
.
Since
(
announces
[
0
]
.
time
)
>
arriveTimeout
{
if
time
.
Since
(
announces
[
0
]
.
time
)
>
arriveTimeout
{
announce
:=
announces
[
rand
.
Intn
(
len
(
announces
))]
announce
:=
announces
[
rand
.
Intn
(
len
(
announces
))]
if
!
f
.
hasBlock
(
hash
)
{
if
f
.
getBlock
(
hash
)
==
nil
{
request
[
announce
.
origin
]
=
append
(
request
[
announce
.
origin
],
hash
)
request
[
announce
.
origin
]
=
append
(
request
[
announce
.
origin
],
hash
)
f
.
fetching
[
hash
]
=
announce
f
.
fetching
[
hash
]
=
announce
}
}
...
@@ -265,7 +270,7 @@ func (f *Fetcher) loop() {
...
@@ -265,7 +270,7 @@ func (f *Fetcher) loop() {
// Filter explicitly requested blocks from hash announcements
// Filter explicitly requested blocks from hash announcements
if
_
,
ok
:=
f
.
fetching
[
hash
];
ok
{
if
_
,
ok
:=
f
.
fetching
[
hash
];
ok
{
// Discard if already imported by other means
// Discard if already imported by other means
if
!
f
.
hasBlock
(
hash
)
{
if
f
.
getBlock
(
hash
)
==
nil
{
explicit
=
append
(
explicit
,
block
)
explicit
=
append
(
explicit
,
block
)
}
else
{
}
else
{
delete
(
f
.
fetching
,
hash
)
delete
(
f
.
fetching
,
hash
)
...
@@ -313,7 +318,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
...
@@ -313,7 +318,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
// Discard any past or too distant blocks
// Discard any past or too distant blocks
if
dist
:=
int64
(
block
.
NumberU64
())
-
int64
(
f
.
chainHeight
());
dist
<=
0
||
dist
>
maxQueueDist
{
if
dist
:=
int64
(
block
.
NumberU64
())
-
int64
(
f
.
chainHeight
());
dist
<=
0
||
dist
>
maxQueueDist
{
glog
.
Infof
(
"Peer %s: discarded block #%d [%x], distance %d"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
dist
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: discarded block #%d [%x], distance %d"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
dist
)
return
return
}
}
// Schedule the block for future importing
// Schedule the block for future importing
...
@@ -321,7 +326,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
...
@@ -321,7 +326,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
f
.
queued
[
hash
]
=
struct
{}{}
f
.
queued
[
hash
]
=
struct
{}{}
f
.
queue
.
Push
(
&
inject
{
origin
:
peer
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
f
.
queue
.
Push
(
&
inject
{
origin
:
peer
,
block
:
block
},
-
float32
(
block
.
NumberU64
()))
if
glog
.
V
(
logger
.
De
tail
)
{
if
glog
.
V
(
logger
.
De
bug
)
{
glog
.
Infof
(
"Peer %s: queued block #%d [%x], total %v"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
f
.
queue
.
Size
())
glog
.
Infof
(
"Peer %s: queued block #%d [%x], total %v"
,
peer
,
block
.
NumberU64
(),
hash
.
Bytes
()[
:
4
],
f
.
queue
.
Size
())
}
}
}
}
...
@@ -339,16 +344,24 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
...
@@ -339,16 +344,24 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
defer
func
()
{
f
.
done
<-
hash
}()
defer
func
()
{
f
.
done
<-
hash
}()
// If the parent's unknown, abort insertion
// If the parent's unknown, abort insertion
if
!
f
.
hasBlock
(
block
.
ParentHash
())
{
parent
:=
f
.
getBlock
(
block
.
ParentHash
())
if
parent
==
nil
{
return
}
// Quickly validate the header and propagate the block if it passes
if
err
:=
f
.
validateBlock
(
block
,
parent
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer %s: block #%d [%x] verification failed: %v"
,
peer
,
block
.
NumberU64
(),
hash
[
:
4
],
err
)
f
.
dropPeer
(
peer
)
return
return
}
}
go
f
.
broadcastBlock
(
block
,
true
)
// Run the actual import and log any issues
// Run the actual import and log any issues
if
_
,
err
:=
f
.
insertChain
(
types
.
Blocks
{
block
});
err
!=
nil
{
if
_
,
err
:=
f
.
insertChain
(
types
.
Blocks
{
block
});
err
!=
nil
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Peer %s: block #%d [%x] import failed: %v"
,
peer
,
block
.
NumberU64
(),
hash
[
:
4
],
err
)
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"Peer %s: block #%d [%x] import failed: %v"
,
peer
,
block
.
NumberU64
(),
hash
[
:
4
],
err
)
f
.
dropPeer
(
peer
)
return
return
}
}
// If import succeeded, broadcast the block
// If import succeeded, broadcast the block
go
f
.
broadcastBlock
(
block
)
go
f
.
broadcastBlock
(
block
,
false
)
}()
}()
}
}
eth/fetcher/fetcher_test.go
View file @
b91b581b
...
@@ -80,23 +80,27 @@ func newTester() *fetcherTester {
...
@@ -80,23 +80,27 @@ func newTester() *fetcherTester {
hashes
:
[]
common
.
Hash
{
knownHash
},
hashes
:
[]
common
.
Hash
{
knownHash
},
blocks
:
map
[
common
.
Hash
]
*
types
.
Block
{
knownHash
:
genesis
},
blocks
:
map
[
common
.
Hash
]
*
types
.
Block
{
knownHash
:
genesis
},
}
}
tester
.
fetcher
=
New
(
tester
.
has
Block
,
tester
.
broadcastBlock
,
tester
.
chainHeight
,
tester
.
insertChain
,
tester
.
dropPeer
)
tester
.
fetcher
=
New
(
tester
.
getBlock
,
tester
.
verify
Block
,
tester
.
broadcastBlock
,
tester
.
chainHeight
,
tester
.
insertChain
,
tester
.
dropPeer
)
tester
.
fetcher
.
Start
()
tester
.
fetcher
.
Start
()
return
tester
return
tester
}
}
//
hasBlock checks if a block is pres ent in the testers canonical
chain.
//
getBlock retrieves a block from the tester's block
chain.
func
(
f
*
fetcherTester
)
hasBlock
(
hash
common
.
Hash
)
bool
{
func
(
f
*
fetcherTester
)
getBlock
(
hash
common
.
Hash
)
*
types
.
Block
{
f
.
lock
.
RLock
()
f
.
lock
.
RLock
()
defer
f
.
lock
.
RUnlock
()
defer
f
.
lock
.
RUnlock
()
_
,
ok
:=
f
.
blocks
[
hash
]
return
f
.
blocks
[
hash
]
return
ok
}
// verifyBlock is a nop placeholder for the block header verification.
func
(
f
*
fetcherTester
)
verifyBlock
(
block
*
types
.
Block
,
parent
*
types
.
Block
)
error
{
return
nil
}
}
// broadcastBlock is a nop placeholder for the block broadcasting.
// broadcastBlock is a nop placeholder for the block broadcasting.
func
(
f
*
fetcherTester
)
broadcastBlock
(
block
*
types
.
Block
)
{
func
(
f
*
fetcherTester
)
broadcastBlock
(
block
*
types
.
Block
,
propagate
bool
)
{
}
}
// chainHeight retrieves the current height (block number) of the chain.
// chainHeight retrieves the current height (block number) of the chain.
...
@@ -257,7 +261,7 @@ func TestPendingDeduplication(t *testing.T) {
...
@@ -257,7 +261,7 @@ func TestPendingDeduplication(t *testing.T) {
return
nil
return
nil
}
}
// Announce the same block many times until it's fetched (wait for any pending ops)
// Announce the same block many times until it's fetched (wait for any pending ops)
for
!
tester
.
hasBlock
(
hashes
[
0
])
{
for
tester
.
getBlock
(
hashes
[
0
])
==
nil
{
tester
.
fetcher
.
Notify
(
"repeater"
,
hashes
[
0
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
wrapper
)
tester
.
fetcher
.
Notify
(
"repeater"
,
hashes
[
0
],
time
.
Now
()
.
Add
(
-
arriveTimeout
),
wrapper
)
time
.
Sleep
(
time
.
Millisecond
)
time
.
Sleep
(
time
.
Millisecond
)
}
}
...
...
eth/handler.go
View file @
b91b581b
...
@@ -6,6 +6,8 @@ import (
...
@@ -6,6 +6,8 @@ import (
"sync"
"sync"
"time"
"time"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types"
...
@@ -67,7 +69,7 @@ type ProtocolManager struct {
...
@@ -67,7 +69,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
// with the ethereum network.
func
NewProtocolManager
(
protocolVersion
,
networkId
int
,
mux
*
event
.
TypeMux
,
txpool
txPool
,
chainman
*
core
.
ChainManager
)
*
ProtocolManager
{
func
NewProtocolManager
(
protocolVersion
,
networkId
int
,
mux
*
event
.
TypeMux
,
txpool
txPool
,
pow
pow
.
PoW
,
chainman
*
core
.
ChainManager
)
*
ProtocolManager
{
// Create the protocol manager and initialize peer handlers
// Create the protocol manager and initialize peer handlers
manager
:=
&
ProtocolManager
{
manager
:=
&
ProtocolManager
{
eventMux
:
mux
,
eventMux
:
mux
,
...
@@ -91,10 +93,13 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
...
@@ -91,10 +93,13 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
// Construct the different synchronisation mechanisms
// Construct the different synchronisation mechanisms
manager
.
downloader
=
downloader
.
New
(
manager
.
eventMux
,
manager
.
chainman
.
HasBlock
,
manager
.
chainman
.
GetBlock
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
manager
.
downloader
=
downloader
.
New
(
manager
.
eventMux
,
manager
.
chainman
.
HasBlock
,
manager
.
chainman
.
GetBlock
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
validator
:=
func
(
block
*
types
.
Block
,
parent
*
types
.
Block
)
error
{
return
core
.
ValidateHeader
(
pow
,
block
.
Header
(),
parent
.
Header
(),
true
)
}
heighter
:=
func
()
uint64
{
heighter
:=
func
()
uint64
{
return
manager
.
chainman
.
CurrentBlock
()
.
NumberU64
()
return
manager
.
chainman
.
CurrentBlock
()
.
NumberU64
()
}
}
manager
.
fetcher
=
fetcher
.
New
(
manager
.
chainman
.
HasBlock
,
manager
.
BroadcastBlock
,
heighter
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
manager
.
fetcher
=
fetcher
.
New
(
manager
.
chainman
.
GetBlock
,
validator
,
manager
.
BroadcastBlock
,
heighter
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
return
manager
return
manager
}
}
...
@@ -261,6 +266,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
...
@@ -261,6 +266,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var
(
var
(
hash
common
.
Hash
hash
common
.
Hash
bytes
common
.
StorageSize
bytes
common
.
StorageSize
hashes
[]
common
.
Hash
blocks
[]
*
types
.
Block
blocks
[]
*
types
.
Block
)
)
for
{
for
{
...
@@ -270,6 +276,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
...
@@ -270,6 +276,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
else
if
err
!=
nil
{
}
else
if
err
!=
nil
{
return
errResp
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
return
errResp
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
}
}
hashes
=
append
(
hashes
,
hash
)
// Retrieve the requested block, stopping if enough was found
// Retrieve the requested block, stopping if enough was found
if
block
:=
pm
.
chainman
.
GetBlock
(
hash
);
block
!=
nil
{
if
block
:=
pm
.
chainman
.
GetBlock
(
hash
);
block
!=
nil
{
blocks
=
append
(
blocks
,
block
)
blocks
=
append
(
blocks
,
block
)
...
@@ -279,6 +287,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
...
@@ -279,6 +287,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
}
}
}
}
if
glog
.
V
(
logger
.
Detail
)
&&
len
(
blocks
)
==
0
&&
len
(
hashes
)
>
0
{
list
:=
"["
for
_
,
hash
:=
range
hashes
{
list
+=
fmt
.
Sprintf
(
"%x, "
,
hash
[
:
4
])
}
list
=
list
[
:
len
(
list
)
-
2
]
+
"]"
glog
.
Infof
(
"Peer %s: no blocks found for requested hashes %s"
,
p
.
id
,
list
)
}
return
p
.
sendBlocks
(
blocks
)
return
p
.
sendBlocks
(
blocks
)
case
BlocksMsg
:
case
BlocksMsg
:
...
@@ -290,6 +307,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
...
@@ -290,6 +307,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Decode error"
,
err
)
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Decode error"
,
err
)
blocks
=
nil
blocks
=
nil
}
}
// Update the receive timestamp of each block
for
i
:=
0
;
i
<
len
(
blocks
);
i
++
{
blocks
[
i
]
.
ReceivedAt
=
msg
.
ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if
blocks
:=
pm
.
fetcher
.
Filter
(
blocks
);
len
(
blocks
)
>
0
{
if
blocks
:=
pm
.
fetcher
.
Filter
(
blocks
);
len
(
blocks
)
>
0
{
pm
.
downloader
.
DeliverBlocks
(
p
.
id
,
blocks
)
pm
.
downloader
.
DeliverBlocks
(
p
.
id
,
blocks
)
...
@@ -355,28 +376,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
...
@@ -355,28 +376,27 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return
nil
return
nil
}
}
// BroadcastBlock will
propagate the block to a subset of its connected peers,
// BroadcastBlock will
either propagate a block to a subset of it's peers, or
//
only notifying the rest of the block's appearance
.
//
will only announce it's availability (depending what's requested)
.
func
(
pm
*
ProtocolManager
)
BroadcastBlock
(
block
*
types
.
Block
)
{
func
(
pm
*
ProtocolManager
)
BroadcastBlock
(
block
*
types
.
Block
,
propagate
bool
)
{
hash
:=
block
.
Hash
()
hash
:=
block
.
Hash
()
// Retrieve all the target peers and split between full broadcast or only notification
peers
:=
pm
.
peers
.
PeersWithoutBlock
(
hash
)
peers
:=
pm
.
peers
.
PeersWithoutBlock
(
hash
)
split
:=
int
(
math
.
Sqrt
(
float64
(
len
(
peers
))))
transfer
:=
peers
[
:
split
]
notify
:=
peers
[
split
:
]
// Send out the data transfers and the notifications
// If propagation is requested, send to a subset of the peer
for
_
,
peer
:=
range
notify
{
if
propagate
{
peer
.
sendNewBlockHashes
([]
common
.
Hash
{
hash
})
transfer
:=
peers
[
:
int
(
math
.
Sqrt
(
float64
(
len
(
peers
))))]
for
_
,
peer
:=
range
transfer
{
peer
.
sendNewBlock
(
block
)
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"propagated block %x to %d peers in %v"
,
hash
[
:
4
],
len
(
transfer
),
time
.
Since
(
block
.
ReceivedAt
))
}
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"broadcast hash %x to %d peers."
,
hash
[
:
4
],
len
(
notify
))
// Otherwise if the block is indeed in out own chain, announce it
if
pm
.
chainman
.
HasBlock
(
hash
)
{
for
_
,
peer
:=
range
transfer
{
for
_
,
peer
:=
range
peers
{
peer
.
sendNewBlock
(
block
)
peer
.
sendNewBlockHashes
([]
common
.
Hash
{
hash
})
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"announced block %x to %d peers in %v"
,
hash
[
:
4
],
len
(
peers
),
time
.
Since
(
block
.
ReceivedAt
))
}
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"broadcast block %x to %d peers. Total processing time: %v"
,
hash
[
:
4
],
len
(
transfer
),
time
.
Since
(
block
.
ReceivedAt
))
}
}
// BroadcastTx will propagate the block to its connected peers. It will sort
// BroadcastTx will propagate the block to its connected peers. It will sort
...
@@ -398,7 +418,8 @@ func (self *ProtocolManager) minedBroadcastLoop() {
...
@@ -398,7 +418,8 @@ func (self *ProtocolManager) minedBroadcastLoop() {
for
obj
:=
range
self
.
minedBlockSub
.
Chan
()
{
for
obj
:=
range
self
.
minedBlockSub
.
Chan
()
{
switch
ev
:=
obj
.
(
type
)
{
switch
ev
:=
obj
.
(
type
)
{
case
core
.
NewMinedBlockEvent
:
case
core
.
NewMinedBlockEvent
:
self
.
BroadcastBlock
(
ev
.
Block
)
self
.
BroadcastBlock
(
ev
.
Block
,
false
)
self
.
BroadcastBlock
(
ev
.
Block
,
true
)
}
}
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment