Commit 59bc5412 authored by Jeffrey Wilcke's avatar Jeffrey Wilcke

Merge pull request #915 from karalabe/downloader-threading-fixes

eth/downloader: fix #910, thread safe peers & polishes
parents fe7e2847 fa53c5e0
This diff is collapsed.
...@@ -229,7 +229,7 @@ func TestThrottling(t *testing.T) { ...@@ -229,7 +229,7 @@ func TestThrottling(t *testing.T) {
minDesiredPeerCount = 4 minDesiredPeerCount = 4
blockTtl = 1 * time.Second blockTtl = 1 * time.Second
targetBlocks := 4 * blockCacheLimit targetBlocks := 16 * blockCacheLimit
hashes := createHashes(0, targetBlocks) hashes := createHashes(0, targetBlocks)
blocks := createBlocksFromHashes(hashes) blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks) tester := newTester(t, hashes, blocks)
...@@ -256,6 +256,7 @@ func TestThrottling(t *testing.T) { ...@@ -256,6 +256,7 @@ func TestThrottling(t *testing.T) {
return return
default: default:
took = append(took, tester.downloader.TakeBlocks()...) took = append(took, tester.downloader.TakeBlocks()...)
time.Sleep(time.Millisecond)
} }
} }
}() }()
......
// Contains the active peer-set of the downloader, maintaining both failures
// as well as reputation metrics to prioritize the block retrievals.
package downloader package downloader
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
const (
workingState = 2
idleState = 4
)
type hashFetcherFn func(common.Hash) error type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error type blockFetcherFn func([]common.Hash) error
// XXX make threadsafe!!!! var (
type peers map[string]*peer errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
)
func (p peers) reset() { // peer represents an active peer from which hashes and blocks are retrieved.
for _, peer := range p { type peer struct {
peer.reset() id string // Unique identifier of the peer
} head common.Hash // Hash of the peers latest known block
idle int32 // Current activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation (not used currently)
mu sync.RWMutex
ignored *set.Set
getHashes hashFetcherFn
getBlocks blockFetcherFn
} }
func (p peers) get(state int) []*peer { // newPeer create a new downloader peer, with specific hash and block retrieval
var peers []*peer // mechanisms.
for _, peer := range p { func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
peer.mu.RLock() return &peer{
if peer.state == state { id: id,
peers = append(peers, peer) head: head,
} getHashes: getHashes,
peer.mu.RUnlock() getBlocks: getBlocks,
ignored: set.New(),
} }
}
return peers // Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
atomic.StoreInt32(&p.idle, 0)
p.ignored.Clear()
} }
func (p peers) setState(id string, state int) { // Fetch sends a block retrieval request to the remote peer.
if peer, exist := p[id]; exist { func (p *peer) Fetch(request *fetchRequest) error {
peer.mu.Lock() // Short circuit if the peer is already fetching
defer peer.mu.Unlock() if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
peer.state = state return errAlreadyFetching
} }
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
for hash, _ := range request.Hashes {
hashes = append(hashes, hash)
}
p.getBlocks(hashes)
return nil
} }
func (p peers) getPeer(id string) *peer { // SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
return p[id] func (p *peer) SetIdle() {
atomic.StoreInt32(&p.idle, 0)
} }
// peer represents an active peer // Promote increases the peer's reputation.
type peer struct { func (p *peer) Promote() {
state int // Peer state (working, idle) atomic.AddInt32(&p.rep, 1)
rep int // TODO peer reputation }
mu sync.RWMutex // Demote decreases the peer's reputation or leaves it at 0.
id string func (p *peer) Demote() {
recentHash common.Hash for {
// Calculate the new reputation value
prev := atomic.LoadInt32(&p.rep)
next := prev / 2
ignored *set.Set // Try to update the old value
if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
return
}
}
}
getHashes hashFetcherFn // peerSet represents the collection of active peer participating in the block
getBlocks blockFetcherFn // download procedure.
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
} }
// create a new peer // newPeerSet creates a new peer set top track the active download sources.
func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { func newPeerSet() *peerSet {
return &peer{ return &peerSet{
id: id, peers: make(map[string]*peer),
recentHash: hash,
getHashes: getHashes,
getBlocks: getBlocks,
state: idleState,
ignored: set.New(),
} }
} }
// fetch a chunk using the peer // Reset iterates over the current peer set, and resets each of the known peers
func (p *peer) fetch(request *fetchRequest) error { // to prepare for a next batch of block retrieval.
p.mu.Lock() func (ps *peerSet) Reset() {
defer p.mu.Unlock() ps.lock.RLock()
defer ps.lock.RUnlock()
if p.state == workingState { for _, peer := range ps.peers {
return errors.New("peer already fetching chunk") peer.Reset()
} }
}
// set working state // Register injects a new peer into the working set, or returns an error if the
p.state = workingState // peer is already known.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
// Convert the hash set to a fetchable slice if _, ok := ps.peers[p.id]; ok {
hashes := make([]common.Hash, 0, len(request.Hashes)) return errAlreadyRegistered
for hash, _ := range request.Hashes {
hashes = append(hashes, hash)
} }
p.getBlocks(hashes) ps.peers[p.id] = p
return nil
}
// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.peers[id]; !ok {
return errNotRegistered
}
delete(ps.peers, id)
return nil return nil
} }
// promote increases the peer's reputation // Peer retrieves the registered peer with the given id.
func (p *peer) promote() { func (ps *peerSet) Peer(id string) *peer {
p.mu.Lock() ps.lock.RLock()
defer p.mu.Unlock() defer ps.lock.RUnlock()
return ps.peers[id]
}
// Len returns if the current number of peers in the set.
func (ps *peerSet) Len() int {
ps.lock.RLock()
defer ps.lock.RUnlock()
p.rep++ return len(ps.peers)
} }
// demote decreases the peer's reputation or leaves it at 0 // AllPeers retrieves a flat list of all the peers within the set.
func (p *peer) demote() { func (ps *peerSet) AllPeers() []*peer {
p.mu.Lock() ps.lock.RLock()
defer p.mu.Unlock() defer ps.lock.RUnlock()
if p.rep > 1 { list := make([]*peer, 0, len(ps.peers))
p.rep -= 2 for _, p := range ps.peers {
} else { list = append(list, p)
p.rep = 0
} }
return list
} }
func (p *peer) reset() { // IdlePeers retrieves a flat list of all the currently idle peers within the
p.state = idleState // active peer set, ordered by their reputation.
p.ignored.Clear() func (ps *peerSet) IdlePeers() []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if atomic.LoadInt32(&p.idle) == 0 {
list = append(list, p)
}
}
for i := 0; i < len(list); i++ {
for j := i + 1; j < len(list); j++ {
if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
list[i], list[j] = list[j], list[i]
}
}
}
return list
} }
// Contains the block download scheduler to collect download tasks and schedule
// them in an ordered, and throttled way.
package downloader package downloader
import ( import (
...@@ -8,6 +11,8 @@ import ( ...@@ -8,6 +11,8 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
) )
...@@ -126,6 +131,10 @@ func (q *queue) Insert(hashes []common.Hash) { ...@@ -126,6 +131,10 @@ func (q *queue) Insert(hashes []common.Hash) {
for i, hash := range hashes { for i, hash := range hashes {
index := q.hashCounter + i index := q.hashCounter + i
if old, ok := q.hashPool[hash]; ok {
glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
continue
}
q.hashPool[hash] = index q.hashPool[hash] = index
q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment