Commit 6ec13e7e authored by Felix Lange's avatar Felix Lange

Merge pull request #1701 from karalabe/eth62-sync-rebase

eth: implement eth/62 synchronization logic
parents 79b644c7 17f65cd1
...@@ -283,6 +283,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso ...@@ -283,6 +283,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils.DataDirFlag, utils.DataDirFlag,
utils.BlockchainVersionFlag, utils.BlockchainVersionFlag,
utils.OlympicFlag, utils.OlympicFlag,
utils.EthVersionFlag,
utils.CacheFlag, utils.CacheFlag,
utils.JSpathFlag, utils.JSpathFlag,
utils.ListenPortFlag, utils.ListenPortFlag,
...@@ -333,6 +334,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso ...@@ -333,6 +334,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
app.Before = func(ctx *cli.Context) error { app.Before = func(ctx *cli.Context) error {
utils.SetupLogger(ctx) utils.SetupLogger(ctx)
utils.SetupVM(ctx) utils.SetupVM(ctx)
utils.SetupEth(ctx)
if ctx.GlobalBool(utils.PProfEanbledFlag.Name) { if ctx.GlobalBool(utils.PProfEanbledFlag.Name) {
utils.StartPProf(ctx) utils.StartPProf(ctx)
} }
......
...@@ -289,7 +289,7 @@ func updateChart(metric string, data []float64, base *int, chart *termui.LineCha ...@@ -289,7 +289,7 @@ func updateChart(metric string, data []float64, base *int, chart *termui.LineCha
} }
} }
unit, scale := 0, 1.0 unit, scale := 0, 1.0
for high >= 1000 { for high >= 1000 && unit+1 < len(dataUnits) {
high, unit, scale = high/1000, unit+1, scale*1000 high, unit, scale = high/1000, unit+1, scale*1000
} }
// If the unit changes, re-create the chart (hack to set max height...) // If the unit changes, re-create the chart (hack to set max height...)
......
...@@ -138,6 +138,11 @@ var ( ...@@ -138,6 +138,11 @@ var (
Name: "olympic", Name: "olympic",
Usage: "Use olympic style protocol", Usage: "Use olympic style protocol",
} }
EthVersionFlag = cli.IntFlag{
Name: "eth",
Value: 62,
Usage: "Highest eth protocol to advertise (temporary, dev option)",
}
// miner settings // miner settings
MinerThreadsFlag = cli.IntFlag{ MinerThreadsFlag = cli.IntFlag{
...@@ -459,6 +464,18 @@ func SetupVM(ctx *cli.Context) { ...@@ -459,6 +464,18 @@ func SetupVM(ctx *cli.Context) {
vm.SetJITCacheSize(ctx.GlobalInt(VMJitCacheFlag.Name)) vm.SetJITCacheSize(ctx.GlobalInt(VMJitCacheFlag.Name))
} }
// SetupEth configures the eth packages global settings
func SetupEth(ctx *cli.Context) {
version := ctx.GlobalInt(EthVersionFlag.Name)
for len(eth.ProtocolVersions) > 0 && eth.ProtocolVersions[0] > uint(version) {
eth.ProtocolVersions = eth.ProtocolVersions[1:]
eth.ProtocolLengths = eth.ProtocolLengths[1:]
}
if len(eth.ProtocolVersions) == 0 {
Fatalf("No valid eth protocols remaining")
}
}
// MakeChain creates a chain manager from set command line flags. // MakeChain creates a chain manager from set command line flags.
func MakeChain(ctx *cli.Context) (chain *core.ChainManager, chainDb common.Database) { func MakeChain(ctx *cli.Context) (chain *core.ChainManager, chainDb common.Database) {
datadir := ctx.GlobalString(DataDirFlag.Name) datadir := ctx.GlobalString(DataDirFlag.Name)
......
...@@ -360,6 +360,20 @@ func (b *Block) WithMiningResult(nonce uint64, mixDigest common.Hash) *Block { ...@@ -360,6 +360,20 @@ func (b *Block) WithMiningResult(nonce uint64, mixDigest common.Hash) *Block {
} }
} }
// WithBody returns a new block with the given transaction and uncle contents.
func (b *Block) WithBody(transactions []*Transaction, uncles []*Header) *Block {
block := &Block{
header: copyHeader(b.header),
transactions: make([]*Transaction, len(transactions)),
uncles: make([]*Header, len(uncles)),
}
copy(block.transactions, transactions)
for i := range uncles {
block.uncles[i] = copyHeader(uncles[i])
}
return block
}
// Implement pow.Block // Implement pow.Block
func (b *Block) Hash() common.Hash { func (b *Block) Hash() common.Hash {
......
...@@ -373,7 +373,7 @@ func New(config *Config) (*Ethereum, error) { ...@@ -373,7 +373,7 @@ func New(config *Config) (*Ethereum, error) {
eth.blockProcessor = core.NewBlockProcessor(chainDb, eth.pow, eth.chainManager, eth.EventMux()) eth.blockProcessor = core.NewBlockProcessor(chainDb, eth.pow, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor) eth.chainManager.SetProcessor(eth.blockProcessor)
eth.protocolManager = NewProtocolManager(config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.chainManager) eth.protocolManager = NewProtocolManager(config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.chainManager, chainDb)
eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner = miner.New(eth, eth.EventMux(), eth.pow)
eth.miner.SetGasPrice(config.GasPrice) eth.miner.SetGasPrice(config.GasPrice)
......
This diff is collapsed.
This diff is collapsed.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains the metrics collected by the downloader.
package downloader
import (
"github.com/ethereum/go-ethereum/metrics"
)
var (
hashInMeter = metrics.NewMeter("eth/downloader/hashes/in")
hashReqTimer = metrics.NewTimer("eth/downloader/hashes/req")
hashDropMeter = metrics.NewMeter("eth/downloader/hashes/drop")
hashTimeoutMeter = metrics.NewMeter("eth/downloader/hashes/timeout")
blockInMeter = metrics.NewMeter("eth/downloader/blocks/in")
blockReqTimer = metrics.NewTimer("eth/downloader/blocks/req")
blockDropMeter = metrics.NewMeter("eth/downloader/blocks/drop")
blockTimeoutMeter = metrics.NewMeter("eth/downloader/blocks/timeout")
headerInMeter = metrics.NewMeter("eth/downloader/headers/in")
headerReqTimer = metrics.NewTimer("eth/downloader/headers/req")
headerDropMeter = metrics.NewMeter("eth/downloader/headers/drop")
headerTimeoutMeter = metrics.NewMeter("eth/downloader/headers/timeout")
bodyInMeter = metrics.NewMeter("eth/downloader/bodies/in")
bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req")
bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop")
bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout")
)
...@@ -31,10 +31,16 @@ import ( ...@@ -31,10 +31,16 @@ import (
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error type blockFetcherFn func([]common.Hash) error
// Block header and body fethers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
var ( var (
errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered") errAlreadyRegistered = errors.New("peer is already registered")
...@@ -54,25 +60,37 @@ type peer struct { ...@@ -54,25 +60,37 @@ type peer struct {
ignored *set.Set // Set of hashes not to request (didn't have previously) ignored *set.Set // Set of hashes not to request (didn't have previously)
getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
getBlocks blockFetcherFn // Method to retrieve a batch of blocks getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks
getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
version int // Eth protocol version number to switch strategies version int // Eth protocol version number to switch strategies
} }
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms. // mechanisms.
func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer { func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head, head: head,
capacity: 1, capacity: 1,
ignored: set.New(),
getRelHashes: getRelHashes, getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes, getAbsHashes: getAbsHashes,
getBlocks: getBlocks, getBlocks: getBlocks,
ignored: set.New(),
version: version, getRelHeaders: getRelHeaders,
getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies,
version: version,
} }
} }
...@@ -83,8 +101,8 @@ func (p *peer) Reset() { ...@@ -83,8 +101,8 @@ func (p *peer) Reset() {
p.ignored.Clear() p.ignored.Clear()
} }
// Fetch sends a block retrieval request to the remote peer. // Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error { func (p *peer) Fetch61(request *fetchRequest) error {
// Short circuit if the peer is already fetching // Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching return errAlreadyFetching
...@@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error { ...@@ -101,10 +119,28 @@ func (p *peer) Fetch(request *fetchRequest) error {
return nil return nil
} }
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. // Fetch sends a block body retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
return errAlreadyFetching
}
p.started = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
go p.getBlockBodies(hashes)
return nil
}
// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards, // Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not. // depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() { func (p *peer) SetIdle61() {
// Update the peer's download allowance based on previous performance // Update the peer's download allowance based on previous performance
scale := 2.0 scale := 2.0
if time.Since(p.started) > blockSoftTTL { if time.Since(p.started) > blockSoftTTL {
...@@ -131,6 +167,36 @@ func (p *peer) SetIdle() { ...@@ -131,6 +167,36 @@ func (p *peer) SetIdle() {
atomic.StoreInt32(&p.idle, 0) atomic.StoreInt32(&p.idle, 0)
} }
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block body retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() {
// Update the peer's download allowance based on previous performance
scale := 2.0
if time.Since(p.started) > bodySoftTTL {
scale = 0.5
if time.Since(p.started) > bodyHardTTL {
scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1
}
}
for {
// Calculate the new download bandwidth allowance
prev := atomic.LoadInt32(&p.capacity)
next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale)))
// Try to update the old value
if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
// If we're having problems at 1 capacity, try to find better peers
if next == 1 {
p.Demote()
}
break
}
}
// Set the peer to idle to allow further block requests
atomic.StoreInt32(&p.idle, 0)
}
// Capacity retrieves the peers block download allowance based on its previously // Capacity retrieves the peers block download allowance based on its previously
// discovered bandwidth capacity. // discovered bandwidth capacity.
func (p *peer) Capacity() int { func (p *peer) Capacity() int {
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -23,10 +23,24 @@ import ( ...@@ -23,10 +23,24 @@ import (
) )
var ( var (
announceMeter = metrics.NewMeter("eth/sync/RemoteAnnounces") propAnnounceInMeter = metrics.NewMeter("eth/fetcher/prop/announces/in")
announceTimer = metrics.NewTimer("eth/sync/LocalAnnounces") propAnnounceOutTimer = metrics.NewTimer("eth/fetcher/prop/announces/out")
broadcastMeter = metrics.NewMeter("eth/sync/RemoteBroadcasts") propAnnounceDropMeter = metrics.NewMeter("eth/fetcher/prop/announces/drop")
broadcastTimer = metrics.NewTimer("eth/sync/LocalBroadcasts") propAnnounceDOSMeter = metrics.NewMeter("eth/fetcher/prop/announces/dos")
discardMeter = metrics.NewMeter("eth/sync/DiscardedBlocks")
futureMeter = metrics.NewMeter("eth/sync/FutureBlocks") propBroadcastInMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/in")
propBroadcastOutTimer = metrics.NewTimer("eth/fetcher/prop/broadcasts/out")
propBroadcastDropMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/drop")
propBroadcastDOSMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/dos")
blockFetchMeter = metrics.NewMeter("eth/fetcher/fetch/blocks")
headerFetchMeter = metrics.NewMeter("eth/fetcher/fetch/headers")
bodyFetchMeter = metrics.NewMeter("eth/fetcher/fetch/bodies")
blockFilterInMeter = metrics.NewMeter("eth/fetcher/filter/blocks/in")
blockFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/blocks/out")
headerFilterInMeter = metrics.NewMeter("eth/fetcher/filter/headers/in")
headerFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/headers/out")
bodyFilterInMeter = metrics.NewMeter("eth/fetcher/filter/bodies/in")
bodyFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/bodies/out")
) )
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -49,6 +49,14 @@ func (db *MemDatabase) Get(key []byte) ([]byte, error) { ...@@ -49,6 +49,14 @@ func (db *MemDatabase) Get(key []byte) ([]byte, error) {
return db.db[string(key)], nil return db.db[string(key)], nil
} }
func (db *MemDatabase) Keys() [][]byte {
keys := [][]byte{}
for key, _ := range db.db {
keys = append(keys, []byte(key))
}
return keys
}
/* /*
func (db *MemDatabase) GetKeys() []*common.Key { func (db *MemDatabase) GetKeys() []*common.Key {
data, _ := db.Get([]byte("KeyRing")) data, _ := db.Get([]byte("KeyRing"))
......
...@@ -31,8 +31,8 @@ import ( ...@@ -31,8 +31,8 @@ import (
// MetricsEnabledFlag is the CLI flag name to use to enable metrics collections. // MetricsEnabledFlag is the CLI flag name to use to enable metrics collections.
var MetricsEnabledFlag = "metrics" var MetricsEnabledFlag = "metrics"
// enabled is the flag specifying if metrics are enable or not. // Enabled is the flag specifying if metrics are enable or not.
var enabled = false var Enabled = false
// Init enables or disables the metrics system. Since we need this to run before // Init enables or disables the metrics system. Since we need this to run before
// any other code gets to create meters and timers, we'll actually do an ugly hack // any other code gets to create meters and timers, we'll actually do an ugly hack
...@@ -41,7 +41,7 @@ func init() { ...@@ -41,7 +41,7 @@ func init() {
for _, arg := range os.Args { for _, arg := range os.Args {
if strings.TrimLeft(arg, "-") == MetricsEnabledFlag { if strings.TrimLeft(arg, "-") == MetricsEnabledFlag {
glog.V(logger.Info).Infof("Enabling metrics collection") glog.V(logger.Info).Infof("Enabling metrics collection")
enabled = true Enabled = true
} }
} }
} }
...@@ -49,7 +49,7 @@ func init() { ...@@ -49,7 +49,7 @@ func init() {
// NewMeter create a new metrics Meter, either a real one of a NOP stub depending // NewMeter create a new metrics Meter, either a real one of a NOP stub depending
// on the metrics flag. // on the metrics flag.
func NewMeter(name string) metrics.Meter { func NewMeter(name string) metrics.Meter {
if !enabled { if !Enabled {
return new(metrics.NilMeter) return new(metrics.NilMeter)
} }
return metrics.GetOrRegisterMeter(name, metrics.DefaultRegistry) return metrics.GetOrRegisterMeter(name, metrics.DefaultRegistry)
...@@ -58,7 +58,7 @@ func NewMeter(name string) metrics.Meter { ...@@ -58,7 +58,7 @@ func NewMeter(name string) metrics.Meter {
// NewTimer create a new metrics Timer, either a real one of a NOP stub depending // NewTimer create a new metrics Timer, either a real one of a NOP stub depending
// on the metrics flag. // on the metrics flag.
func NewTimer(name string) metrics.Timer { func NewTimer(name string) metrics.Timer {
if !enabled { if !Enabled {
return new(metrics.NilTimer) return new(metrics.NilTimer)
} }
return metrics.GetOrRegisterTimer(name, metrics.DefaultRegistry) return metrics.GetOrRegisterTimer(name, metrics.DefaultRegistry)
...@@ -68,7 +68,7 @@ func NewTimer(name string) metrics.Timer { ...@@ -68,7 +68,7 @@ func NewTimer(name string) metrics.Timer {
// process. // process.
func CollectProcessMetrics(refresh time.Duration) { func CollectProcessMetrics(refresh time.Duration) {
// Short circuit if the metrics system is disabled // Short circuit if the metrics system is disabled
if !enabled { if !Enabled {
return return
} }
// Create the various data collectors // Create the various data collectors
......
...@@ -38,8 +38,14 @@ type meteredConn struct { ...@@ -38,8 +38,14 @@ type meteredConn struct {
} }
// newMeteredConn creates a new metered connection, also bumping the ingress or // newMeteredConn creates a new metered connection, also bumping the ingress or
// egress connection meter. // egress connection meter. If the metrics system is disabled, this function
// returns the original object.
func newMeteredConn(conn net.Conn, ingress bool) net.Conn { func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
// Short circuit if metrics are disabled
if !metrics.Enabled {
return conn
}
// Otherwise bump the connection counters and wrap the connection
if ingress { if ingress {
ingressConnectMeter.Mark(1) ingressConnectMeter.Mark(1)
} else { } else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment