Commit 5f341e5d authored by Jeffrey Wilcke's avatar Jeffrey Wilcke

Merge pull request #1212 from fjl/p2p-eth-block-timeout

eth, p2p: improve write timeouts and behaviour under load
parents fda49f2b 73c35559
...@@ -247,6 +247,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { ...@@ -247,6 +247,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
} }
// GetTransactions returns all currently processable transactions. // GetTransactions returns all currently processable transactions.
// The returned slice may be modified by the caller.
func (self *TxPool) GetTransactions() (txs types.Transactions) { func (self *TxPool) GetTransactions() (txs types.Transactions) {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
......
...@@ -67,6 +67,13 @@ func (tx *Transaction) Hash() common.Hash { ...@@ -67,6 +67,13 @@ func (tx *Transaction) Hash() common.Hash {
}) })
} }
// Size returns the encoded RLP size of tx.
func (self *Transaction) Size() common.StorageSize {
c := writeCounter(0)
rlp.Encode(&c, self)
return common.StorageSize(c)
}
func (self *Transaction) Data() []byte { func (self *Transaction) Data() []byte {
return self.Payload return self.Payload
} }
......
...@@ -263,23 +263,29 @@ func (d *Downloader) Cancel() bool { ...@@ -263,23 +263,29 @@ func (d *Downloader) Cancel() bool {
// XXX Make synchronous // XXX Make synchronous
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
start := time.Now()
// Add the hash to the queue first, and start hash retrieval
d.queue.Insert([]common.Hash{h})
p.getHashes(h)
var ( var (
start = time.Now()
active = p // active peer will help determine the current active peer active = p // active peer will help determine the current active peer
head = common.Hash{} // common and last hash head = common.Hash{} // common and last hash
timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer timeout = time.NewTimer(0) // timer to dump a non-responsive active peer
attempted = make(map[string]bool) // attempted peers will help with retries attempted = make(map[string]bool) // attempted peers will help with retries
crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
) )
defer crossTicker.Stop() defer crossTicker.Stop()
defer timeout.Stop()
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
<-timeout.C // timeout channel should be initially empty.
getHashes := func(from common.Hash) {
active.getHashes(from)
timeout.Reset(hashTTL)
}
// Add the hash to the queue, and start hash retrieval.
d.queue.Insert([]common.Hash{h})
getHashes(h)
attempted[p.id] = true attempted[p.id] = true
for finished := false; !finished; { for finished := false; !finished; {
...@@ -293,7 +299,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { ...@@ -293,7 +299,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
break break
} }
timeout.Reset(hashTTL) timeout.Stop()
// Make sure the peer actually gave something valid // Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 { if len(hashPack.hashes) == 0 {
...@@ -345,7 +351,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { ...@@ -345,7 +351,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
active.getBlocks([]common.Hash{origin}) active.getBlocks([]common.Hash{origin})
// Also fetch a fresh // Also fetch a fresh
active.getHashes(head) getHashes(head)
continue continue
} }
// We're done, prepare the download cache and proceed pulling the blocks // We're done, prepare the download cache and proceed pulling the blocks
...@@ -399,7 +405,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { ...@@ -399,7 +405,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// set p to the active peer. this will invalidate any hashes that may be returned // set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer. // by our previous (delayed) peer.
active = p active = p
p.getHashes(head) getHashes(head)
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id) glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
} }
} }
......
...@@ -18,6 +18,11 @@ import ( ...@@ -18,6 +18,11 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
// This is the target maximum size of returned blocks for the
// getBlocks message. The reply message may exceed it
// if a single block is larger than the limit.
const maxBlockRespSize = 2 * 1024 * 1024
func errResp(code errCode, format string, v ...interface{}) error { func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
} }
...@@ -48,9 +53,11 @@ type ProtocolManager struct { ...@@ -48,9 +53,11 @@ type ProtocolManager struct {
txSub event.Subscription txSub event.Subscription
minedBlockSub event.Subscription minedBlockSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer newPeerCh chan *peer
newHashCh chan []*blockAnnounce newHashCh chan []*blockAnnounce
newBlockCh chan chan []*types.Block newBlockCh chan chan []*types.Block
txsyncCh chan *txsync
quitSync chan struct{} quitSync chan struct{}
// wait group is used for graceful shutdowns during downloading // wait group is used for graceful shutdowns during downloading
...@@ -71,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo ...@@ -71,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
newPeerCh: make(chan *peer, 1), newPeerCh: make(chan *peer, 1),
newHashCh: make(chan []*blockAnnounce, 1), newHashCh: make(chan []*blockAnnounce, 1),
newBlockCh: make(chan chan []*types.Block), newBlockCh: make(chan chan []*types.Block),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}), quitSync: make(chan struct{}),
} }
manager.SubProtocol = p2p.Protocol{ manager.SubProtocol = p2p.Protocol{
Name: "eth", Name: "eth",
Version: uint(protocolVersion), Version: uint(protocolVersion),
...@@ -113,13 +120,14 @@ func (pm *ProtocolManager) Start() { ...@@ -113,13 +120,14 @@ func (pm *ProtocolManager) Start() {
// broadcast transactions // broadcast transactions
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
go pm.txBroadcastLoop() go pm.txBroadcastLoop()
// broadcast mined blocks // broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop() go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer() go pm.syncer()
go pm.fetcher() go pm.fetcher()
go pm.txsyncLoop()
} }
func (pm *ProtocolManager) Stop() { func (pm *ProtocolManager) Stop() {
...@@ -130,7 +138,7 @@ func (pm *ProtocolManager) Stop() { ...@@ -130,7 +138,7 @@ func (pm *ProtocolManager) Stop() {
pm.quit = true pm.quit = true
pm.txSub.Unsubscribe() // quits txBroadcastLoop pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
close(pm.quitSync) // quits the sync handler close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
// Wait for any process action // Wait for any process action
pm.wg.Wait() pm.wg.Wait()
...@@ -145,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter ...@@ -145,11 +153,12 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
} }
func (pm *ProtocolManager) handle(p *peer) error { func (pm *ProtocolManager) handle(p *peer) error {
// Execute the Ethereum handshake, short circuit if fails // Execute the Ethereum handshake.
if err := p.handleStatus(); err != nil { if err := p.handleStatus(); err != nil {
return err return err
} }
// Register the peer locally and in the downloader too
// Register the peer locally.
glog.V(logger.Detail).Infoln("Adding peer", p.id) glog.V(logger.Detail).Infoln("Adding peer", p.id)
if err := pm.peers.Register(p); err != nil { if err := pm.peers.Register(p); err != nil {
glog.V(logger.Error).Infoln("Addition failed:", err) glog.V(logger.Error).Infoln("Addition failed:", err)
...@@ -157,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error { ...@@ -157,14 +166,16 @@ func (pm *ProtocolManager) handle(p *peer) error {
} }
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader
// considers it banned, we disconnect.
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
return err return err
} }
// propagate existing transactions. new transactions appearing
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts. // after this will be sent via broadcasts.
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { pm.syncTransactions(p)
return err
}
// main loop. handle incoming messages. // main loop. handle incoming messages.
for { for {
if err := pm.handleMsg(p); err != nil { if err := pm.handleMsg(p); err != nil {
...@@ -246,7 +257,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { ...@@ -246,7 +257,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
if _, err := msgStream.List(); err != nil { if _, err := msgStream.List(); err != nil {
return err return err
} }
var i int var (
i int
totalsize common.StorageSize
)
for { for {
i++ i++
var hash common.Hash var hash common.Hash
...@@ -260,8 +274,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error { ...@@ -260,8 +274,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
block := self.chainman.GetBlock(hash) block := self.chainman.GetBlock(hash)
if block != nil { if block != nil {
blocks = append(blocks, block) blocks = append(blocks, block)
totalsize += block.Size()
} }
if i == downloader.MaxBlockFetch { if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize {
break break
} }
} }
......
...@@ -57,10 +57,12 @@ var errorToString = map[int]string{ ...@@ -57,10 +57,12 @@ var errorToString = map[int]string{
ErrSuspendedPeer: "Suspended peer", ErrSuspendedPeer: "Suspended peer",
} }
// backend is the interface the ethereum protocol backend should implement
// used as an argument to EthProtocol
type txPool interface { type txPool interface {
// AddTransactions should add the given transactions to the pool.
AddTransactions([]*types.Transaction) AddTransactions([]*types.Transaction)
// GetTransactions should return pending transactions.
// The slice should be modifiable by the caller.
GetTransactions() types.Transactions GetTransactions() types.Transactions
} }
......
This diff is collapsed.
...@@ -2,6 +2,7 @@ package eth ...@@ -2,6 +2,7 @@ package eth
import ( import (
"math" "math"
"math/rand"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -10,6 +11,7 @@ import ( ...@@ -10,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p/discover"
) )
const ( const (
...@@ -20,6 +22,10 @@ const ( ...@@ -20,6 +22,10 @@ const (
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
minDesiredPeerCount = 5 // Amount of peers desired to start syncing minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256 blockProcAmount = 256
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
) )
// blockAnnounce is the hash notification of the availability of a new block in // blockAnnounce is the hash notification of the availability of a new block in
...@@ -30,6 +36,94 @@ type blockAnnounce struct { ...@@ -30,6 +36,94 @@ type blockAnnounce struct {
time time.Time time time.Time
} }
type txsync struct {
p *peer
txs []*types.Transaction
}
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
txs := pm.txpool.GetTransactions()
if len(txs) == 0 {
return
}
select {
case pm.txsyncCh <- &txsync{p, txs}:
case <-pm.quitSync:
}
}
// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
pending = make(map[discover.NodeID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)
sending = true
go func() { done <- pack.p.sendTransactions(pack.txs) }()
}
// pick chooses the next pending sync.
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}
for {
select {
case s := <-pm.txsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err)
delete(pending, pack.p.ID())
}
// Schedule the next send.
if s := pick(); s != nil {
send(s)
}
case <-pm.quitSync:
return
}
}
}
// fetcher is responsible for collecting hash notifications, and periodically // fetcher is responsible for collecting hash notifications, and periodically
// checking all unknown ones and individually fetching them. // checking all unknown ones and individually fetching them.
func (pm *ProtocolManager) fetcher() { func (pm *ProtocolManager) fetcher() {
......
...@@ -30,7 +30,7 @@ const ( ...@@ -30,7 +30,7 @@ const (
frameReadTimeout = 30 * time.Second frameReadTimeout = 30 * time.Second
// Maximum amount of time allowed for writing a complete message. // Maximum amount of time allowed for writing a complete message.
frameWriteTimeout = 5 * time.Second frameWriteTimeout = 20 * time.Second
) )
var errServerStopped = errors.New("server stopped") var errServerStopped = errors.New("server stopped")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment