Commit 08794922 authored by Jeffrey Wilcke's avatar Jeffrey Wilcke

Merge pull request #1153 from karalabe/downloader-banned-starvation-attack

eth/downloader: gather and ban hashes from invalid chains
parents 11f65cf8 4ed3509a
This diff is collapsed.
This diff is collapsed.
...@@ -94,7 +94,7 @@ func (p *peer) SetIdle() { ...@@ -94,7 +94,7 @@ func (p *peer) SetIdle() {
for { for {
// Calculate the new download bandwidth allowance // Calculate the new download bandwidth allowance
prev := atomic.LoadInt32(&p.capacity) prev := atomic.LoadInt32(&p.capacity)
next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale))) next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale)))
// Try to update the old value // Try to update the old value
if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
......
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
) )
const ( var (
blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
) )
...@@ -50,10 +50,11 @@ type queue struct { ...@@ -50,10 +50,11 @@ type queue struct {
// newQueue creates a new download queue for scheduling block retrieval. // newQueue creates a new download queue for scheduling block retrieval.
func newQueue() *queue { func newQueue() *queue {
return &queue{ return &queue{
hashPool: make(map[common.Hash]int), hashPool: make(map[common.Hash]int),
hashQueue: prque.New(), hashQueue: prque.New(),
pendPool: make(map[string]*fetchRequest), pendPool: make(map[string]*fetchRequest),
blockPool: make(map[common.Hash]int), blockPool: make(map[common.Hash]int),
blockCache: make([]*Block, blockCacheLimit),
} }
} }
...@@ -70,7 +71,7 @@ func (q *queue) Reset() { ...@@ -70,7 +71,7 @@ func (q *queue) Reset() {
q.blockPool = make(map[common.Hash]int) q.blockPool = make(map[common.Hash]int)
q.blockOffset = 0 q.blockOffset = 0
q.blockCache = nil q.blockCache = make([]*Block, blockCacheLimit)
} }
// Size retrieves the number of hashes in the queue, returning separately for // Size retrieves the number of hashes in the queue, returning separately for
...@@ -208,7 +209,7 @@ func (q *queue) TakeBlocks() []*Block { ...@@ -208,7 +209,7 @@ func (q *queue) TakeBlocks() []*Block {
// Reserve reserves a set of hashes for the given peer, skipping any previously // Reserve reserves a set of hashes for the given peer, skipping any previously
// failed download. // failed download.
func (q *queue) Reserve(p *peer) *fetchRequest { func (q *queue) Reserve(p *peer, count int) *fetchRequest {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -229,8 +230,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest { ...@@ -229,8 +230,7 @@ func (q *queue) Reserve(p *peer) *fetchRequest {
send := make(map[common.Hash]int) send := make(map[common.Hash]int)
skip := make(map[common.Hash]int) skip := make(map[common.Hash]int)
capacity := p.Capacity() for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ {
for proc := 0; proc < space && len(send) < capacity && !q.hashQueue.Empty(); proc++ {
hash, priority := q.hashQueue.Pop() hash, priority := q.hashQueue.Pop()
if p.ignored.Has(hash) { if p.ignored.Has(hash) {
skip[hash.(common.Hash)] = int(priority) skip[hash.(common.Hash)] = int(priority)
...@@ -345,20 +345,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { ...@@ -345,20 +345,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
return nil return nil
} }
// Alloc ensures that the block cache is the correct size, given a starting // Prepare configures the block cache offset to allow accepting inbound blocks.
// offset, and a memory cap. func (q *queue) Prepare(offset int) {
func (q *queue) Alloc(offset int) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
if q.blockOffset < offset { if q.blockOffset < offset {
q.blockOffset = offset q.blockOffset = offset
} }
size := len(q.hashPool)
if size > blockCacheLimit {
size = blockCacheLimit
}
if len(q.blockCache) < size {
q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...)
}
} }
...@@ -213,8 +213,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { ...@@ -213,8 +213,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "->msg %v: %v", msg, err) return errResp(ErrDecode, "->msg %v: %v", msg, err)
} }
if request.Amount > downloader.MaxHashFetch { if request.Amount > uint64(downloader.MaxHashFetch) {
request.Amount = downloader.MaxHashFetch request.Amount = uint64(downloader.MaxHashFetch)
} }
hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
......
...@@ -102,7 +102,7 @@ func (p *peer) sendTransaction(tx *types.Transaction) error { ...@@ -102,7 +102,7 @@ func (p *peer) sendTransaction(tx *types.Transaction) error {
func (p *peer) requestHashes(from common.Hash) error { func (p *peer) requestHashes(from common.Hash) error {
glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4])
return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch}) return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, uint64(downloader.MaxHashFetch)})
} }
func (p *peer) requestBlocks(hashes []common.Hash) error { func (p *peer) requestBlocks(hashes []common.Hash) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment