Unverified Commit d4d88f9b authored by Felföldi Zsolt's avatar Felföldi Zsolt Committed by GitHub

les: remove obsolete code related to PoW header syncing (#27737)

This change removes PoW header syncing related code from LES and also deletes 
duplicated packages les/catalyst, les/downloader and les/fetcher. These package copies
were created because people wanted to make changes in their eth/ counterparts, but weren't
able to adapt LES code to the API changes.
parent 988d84aa
...@@ -93,9 +93,6 @@ var ( ...@@ -93,9 +93,6 @@ var (
utils.LightMaxPeersFlag, utils.LightMaxPeersFlag,
utils.LightNoPruneFlag, utils.LightNoPruneFlag,
utils.LightKDFFlag, utils.LightKDFFlag,
utils.UltraLightServersFlag,
utils.UltraLightFractionFlag,
utils.UltraLightOnlyAnnounceFlag,
utils.LightNoSyncServeFlag, utils.LightNoSyncServeFlag,
utils.EthRequiredBlocksFlag, utils.EthRequiredBlocksFlag,
utils.LegacyWhitelistFlag, utils.LegacyWhitelistFlag,
......
...@@ -61,7 +61,6 @@ import ( ...@@ -61,7 +61,6 @@ import (
"github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/les"
lescatalyst "github.com/ethereum/go-ethereum/les/catalyst"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/exp" "github.com/ethereum/go-ethereum/metrics/exp"
...@@ -294,23 +293,6 @@ var ( ...@@ -294,23 +293,6 @@ var (
Value: ethconfig.Defaults.LightPeers, Value: ethconfig.Defaults.LightPeers,
Category: flags.LightCategory, Category: flags.LightCategory,
} }
UltraLightServersFlag = &cli.StringFlag{
Name: "ulc.servers",
Usage: "List of trusted ultra-light servers",
Value: strings.Join(ethconfig.Defaults.UltraLightServers, ","),
Category: flags.LightCategory,
}
UltraLightFractionFlag = &cli.IntFlag{
Name: "ulc.fraction",
Usage: "Minimum % of trusted ultra-light servers required to announce a new head",
Value: ethconfig.Defaults.UltraLightFraction,
Category: flags.LightCategory,
}
UltraLightOnlyAnnounceFlag = &cli.BoolFlag{
Name: "ulc.onlyannounce",
Usage: "Ultra light server sends announcements only",
Category: flags.LightCategory,
}
LightNoPruneFlag = &cli.BoolFlag{ LightNoPruneFlag = &cli.BoolFlag{
Name: "light.nopruning", Name: "light.nopruning",
Usage: "Disable ancient light chain data pruning", Usage: "Disable ancient light chain data pruning",
...@@ -1211,19 +1193,6 @@ func setLes(ctx *cli.Context, cfg *ethconfig.Config) { ...@@ -1211,19 +1193,6 @@ func setLes(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.IsSet(LightMaxPeersFlag.Name) { if ctx.IsSet(LightMaxPeersFlag.Name) {
cfg.LightPeers = ctx.Int(LightMaxPeersFlag.Name) cfg.LightPeers = ctx.Int(LightMaxPeersFlag.Name)
} }
if ctx.IsSet(UltraLightServersFlag.Name) {
cfg.UltraLightServers = strings.Split(ctx.String(UltraLightServersFlag.Name), ",")
}
if ctx.IsSet(UltraLightFractionFlag.Name) {
cfg.UltraLightFraction = ctx.Int(UltraLightFractionFlag.Name)
}
if cfg.UltraLightFraction <= 0 && cfg.UltraLightFraction > 100 {
log.Error("Ultra light fraction is invalid", "had", cfg.UltraLightFraction, "updated", ethconfig.Defaults.UltraLightFraction)
cfg.UltraLightFraction = ethconfig.Defaults.UltraLightFraction
}
if ctx.IsSet(UltraLightOnlyAnnounceFlag.Name) {
cfg.UltraLightOnlyAnnounce = ctx.Bool(UltraLightOnlyAnnounceFlag.Name)
}
if ctx.IsSet(LightNoPruneFlag.Name) { if ctx.IsSet(LightNoPruneFlag.Name) {
cfg.LightNoPrune = ctx.Bool(LightNoPruneFlag.Name) cfg.LightNoPrune = ctx.Bool(LightNoPruneFlag.Name)
} }
...@@ -1884,9 +1853,6 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend ...@@ -1884,9 +1853,6 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend
Fatalf("Failed to register the Ethereum service: %v", err) Fatalf("Failed to register the Ethereum service: %v", err)
} }
stack.RegisterAPIs(tracers.APIs(backend.ApiBackend)) stack.RegisterAPIs(tracers.APIs(backend.ApiBackend))
if err := lescatalyst.Register(stack, backend); err != nil {
Fatalf("Failed to register the Engine API service: %v", err)
}
return backend.ApiBackend, nil return backend.ApiBackend, nil
} }
backend, err := eth.New(stack, cfg) backend, err := eth.New(stack, cfg)
......
...@@ -61,7 +61,6 @@ var Defaults = Config{ ...@@ -61,7 +61,6 @@ var Defaults = Config{
NetworkId: 1, NetworkId: 1,
TxLookupLimit: 2350000, TxLookupLimit: 2350000,
LightPeers: 100, LightPeers: 100,
UltraLightFraction: 75,
DatabaseCache: 512, DatabaseCache: 512,
TrieCleanCache: 154, TrieCleanCache: 154,
TrieDirtyCache: 256, TrieDirtyCache: 256,
...@@ -111,11 +110,6 @@ type Config struct { ...@@ -111,11 +110,6 @@ type Config struct {
LightNoPrune bool `toml:",omitempty"` // Whether to disable light chain pruning LightNoPrune bool `toml:",omitempty"` // Whether to disable light chain pruning
LightNoSyncServe bool `toml:",omitempty"` // Whether to serve light clients before syncing LightNoSyncServe bool `toml:",omitempty"` // Whether to serve light clients before syncing
// Ultra Light client options
UltraLightServers []string `toml:",omitempty"` // List of trusted ultra light servers
UltraLightFraction int `toml:",omitempty"` // Percentage of trusted servers to accept an announcement
UltraLightOnlyAnnounce bool `toml:",omitempty"` // Whether to only announce headers, or also serve them
// Database options // Database options
SkipBcVersionCheck bool `toml:"-"` SkipBcVersionCheck bool `toml:"-"`
DatabaseHandles int `toml:"-"` DatabaseHandles int `toml:"-"`
......
...@@ -31,9 +31,6 @@ func (c Config) MarshalTOML() (interface{}, error) { ...@@ -31,9 +31,6 @@ func (c Config) MarshalTOML() (interface{}, error) {
LightPeers int `toml:",omitempty"` LightPeers int `toml:",omitempty"`
LightNoPrune bool `toml:",omitempty"` LightNoPrune bool `toml:",omitempty"`
LightNoSyncServe bool `toml:",omitempty"` LightNoSyncServe bool `toml:",omitempty"`
UltraLightServers []string `toml:",omitempty"`
UltraLightFraction int `toml:",omitempty"`
UltraLightOnlyAnnounce bool `toml:",omitempty"`
SkipBcVersionCheck bool `toml:"-"` SkipBcVersionCheck bool `toml:"-"`
DatabaseHandles int `toml:"-"` DatabaseHandles int `toml:"-"`
DatabaseCache int DatabaseCache int
...@@ -71,9 +68,6 @@ func (c Config) MarshalTOML() (interface{}, error) { ...@@ -71,9 +68,6 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.LightPeers = c.LightPeers enc.LightPeers = c.LightPeers
enc.LightNoPrune = c.LightNoPrune enc.LightNoPrune = c.LightNoPrune
enc.LightNoSyncServe = c.LightNoSyncServe enc.LightNoSyncServe = c.LightNoSyncServe
enc.UltraLightServers = c.UltraLightServers
enc.UltraLightFraction = c.UltraLightFraction
enc.UltraLightOnlyAnnounce = c.UltraLightOnlyAnnounce
enc.SkipBcVersionCheck = c.SkipBcVersionCheck enc.SkipBcVersionCheck = c.SkipBcVersionCheck
enc.DatabaseHandles = c.DatabaseHandles enc.DatabaseHandles = c.DatabaseHandles
enc.DatabaseCache = c.DatabaseCache enc.DatabaseCache = c.DatabaseCache
...@@ -115,9 +109,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { ...@@ -115,9 +109,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
LightPeers *int `toml:",omitempty"` LightPeers *int `toml:",omitempty"`
LightNoPrune *bool `toml:",omitempty"` LightNoPrune *bool `toml:",omitempty"`
LightNoSyncServe *bool `toml:",omitempty"` LightNoSyncServe *bool `toml:",omitempty"`
UltraLightServers []string `toml:",omitempty"`
UltraLightFraction *int `toml:",omitempty"`
UltraLightOnlyAnnounce *bool `toml:",omitempty"`
SkipBcVersionCheck *bool `toml:"-"` SkipBcVersionCheck *bool `toml:"-"`
DatabaseHandles *int `toml:"-"` DatabaseHandles *int `toml:"-"`
DatabaseCache *int DatabaseCache *int
...@@ -188,15 +179,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { ...@@ -188,15 +179,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.LightNoSyncServe != nil { if dec.LightNoSyncServe != nil {
c.LightNoSyncServe = *dec.LightNoSyncServe c.LightNoSyncServe = *dec.LightNoSyncServe
} }
if dec.UltraLightServers != nil {
c.UltraLightServers = dec.UltraLightServers
}
if dec.UltraLightFraction != nil {
c.UltraLightFraction = *dec.UltraLightFraction
}
if dec.UltraLightOnlyAnnounce != nil {
c.UltraLightOnlyAnnounce = *dec.UltraLightOnlyAnnounce
}
if dec.SkipBcVersionCheck != nil { if dec.SkipBcVersionCheck != nil {
c.SkipBcVersionCheck = *dec.SkipBcVersionCheck c.SkipBcVersionCheck = *dec.SkipBcVersionCheck
} }
......
...@@ -57,7 +57,6 @@ func (b *LesApiBackend) CurrentBlock() *types.Header { ...@@ -57,7 +57,6 @@ func (b *LesApiBackend) CurrentBlock() *types.Header {
} }
func (b *LesApiBackend) SetHead(number uint64) { func (b *LesApiBackend) SetHead(number uint64) {
b.eth.handler.downloader.Cancel()
b.eth.blockchain.SetHead(number) b.eth.blockchain.SetHead(number)
} }
...@@ -264,7 +263,7 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven ...@@ -264,7 +263,7 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
} }
func (b *LesApiBackend) SyncProgress() ethereum.SyncProgress { func (b *LesApiBackend) SyncProgress() ethereum.SyncProgress {
return b.eth.Downloader().Progress() return ethereum.SyncProgress{}
} }
func (b *LesApiBackend) ProtocolVersion() int { func (b *LesApiBackend) ProtocolVersion() int {
......
...@@ -31,9 +31,8 @@ import ( ...@@ -31,9 +31,8 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
ethdownloader "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/les/downloader"
"github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
...@@ -493,13 +492,13 @@ func testSim(t *testing.T, serverCount, clientCount int, serverDir, clientDir [] ...@@ -493,13 +492,13 @@ func testSim(t *testing.T, serverCount, clientCount int, serverDir, clientDir []
func newLesClientService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { func newLesClientService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
config := ethconfig.Defaults config := ethconfig.Defaults
config.SyncMode = (ethdownloader.SyncMode)(downloader.LightSync) config.SyncMode = downloader.LightSync
return New(stack, &config) return New(stack, &config)
} }
func newLesServerService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { func newLesServerService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) {
config := ethconfig.Defaults config := ethconfig.Defaults
config.SyncMode = (ethdownloader.SyncMode)(downloader.FullSync) config.SyncMode = downloader.FullSync
config.LightServ = testServerCapacity config.LightServ = testServerCapacity
config.LightPeers = testMaxClients config.LightPeers = testMaxClients
ethereum, err := eth.New(stack, &config) ethereum, err := eth.New(stack, &config)
......
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package catalyst implements the temporary eth1/eth2 RPC integration.
package catalyst
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
)
// Register adds catalyst APIs to the light client.
func Register(stack *node.Node, backend *les.LightEthereum) error {
log.Warn("Catalyst mode enabled", "protocol", "les")
stack.RegisterAPIs([]rpc.API{
{
Namespace: "engine",
Service: NewConsensusAPI(backend),
Authenticated: true,
},
})
return nil
}
type ConsensusAPI struct {
les *les.LightEthereum
}
// NewConsensusAPI creates a new consensus api for the given backend.
// The underlying blockchain needs to have a valid terminal total difficulty set.
func NewConsensusAPI(les *les.LightEthereum) *ConsensusAPI {
if les.BlockChain().Config().TerminalTotalDifficulty == nil {
log.Warn("Catalyst started without valid total difficulty")
}
return &ConsensusAPI{les: les}
}
// ForkchoiceUpdatedV1 has several responsibilities:
//
// We try to set our blockchain to the headBlock.
//
// If the method is called with an empty head block: we return success, which can be used
// to check if the catalyst mode is enabled.
//
// If the total difficulty was not reached: we return INVALID.
//
// If the finalizedBlockHash is set: we check if we have the finalizedBlockHash in our db,
// if not we start a sync.
//
// If there are payloadAttributes: we return an error since block creation is not
// supported in les mode.
func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
if heads.HeadBlockHash == (common.Hash{}) {
log.Warn("Forkchoice requested update to zero hash")
return engine.STATUS_INVALID, nil // TODO(karalabe): Why does someone send us this?
}
if err := api.checkTerminalTotalDifficulty(heads.HeadBlockHash); err != nil {
if header := api.les.BlockChain().GetHeaderByHash(heads.HeadBlockHash); header == nil {
// TODO (MariusVanDerWijden) trigger sync
return engine.STATUS_SYNCING, nil
}
return engine.STATUS_INVALID, err
}
// If the finalized block is set, check if it is in our blockchain
if heads.FinalizedBlockHash != (common.Hash{}) {
if header := api.les.BlockChain().GetHeaderByHash(heads.FinalizedBlockHash); header == nil {
// TODO (MariusVanDerWijden) trigger sync
return engine.STATUS_SYNCING, nil
}
}
// SetHead
if err := api.setCanonical(heads.HeadBlockHash); err != nil {
return engine.STATUS_INVALID, err
}
if payloadAttributes != nil {
return engine.STATUS_INVALID, errors.New("not supported")
}
return api.validForkChoiceResponse(), nil
}
// GetPayloadV1 returns a cached payload by id. It's not supported in les mode.
func (api *ConsensusAPI) GetPayloadV1(payloadID engine.PayloadID) (*engine.ExecutableData, error) {
return nil, engine.GenericServerError.With(errors.New("not supported in light client mode"))
}
// ExecutePayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) ExecutePayloadV1(params engine.ExecutableData) (engine.PayloadStatusV1, error) {
block, err := engine.ExecutableDataToBlock(params)
if err != nil {
return api.invalid(), err
}
if !api.les.BlockChain().HasHeader(block.ParentHash(), block.NumberU64()-1) {
/*
TODO (MariusVanDerWijden) reenable once sync is merged
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil {
return SYNCING, err
}
*/
// TODO (MariusVanDerWijden) we should return nil here not empty hash
return engine.PayloadStatusV1{Status: engine.SYNCING, LatestValidHash: nil}, nil
}
parent := api.les.BlockChain().GetHeaderByHash(params.ParentHash)
if parent == nil {
return api.invalid(), fmt.Errorf("could not find parent %x", params.ParentHash)
}
td := api.les.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1)
ttd := api.les.BlockChain().Config().TerminalTotalDifficulty
if td.Cmp(ttd) < 0 {
return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
}
if err = api.les.BlockChain().InsertHeader(block.Header()); err != nil {
return api.invalid(), err
}
if merger := api.les.Merger(); !merger.TDDReached() {
merger.ReachTTD()
}
hash := block.Hash()
return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil
}
func (api *ConsensusAPI) validForkChoiceResponse() engine.ForkChoiceResponse {
currentHash := api.les.BlockChain().CurrentHeader().Hash()
return engine.ForkChoiceResponse{
PayloadStatus: engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &currentHash},
}
}
// invalid returns a response "INVALID" with the latest valid hash set to the current head.
func (api *ConsensusAPI) invalid() engine.PayloadStatusV1 {
currentHash := api.les.BlockChain().CurrentHeader().Hash()
return engine.PayloadStatusV1{Status: engine.INVALID, LatestValidHash: &currentHash}
}
func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
// shortcut if we entered PoS already
if api.les.Merger().PoSFinalized() {
return nil
}
// make sure the parent has enough terminal total difficulty
header := api.les.BlockChain().GetHeaderByHash(head)
if header == nil {
return errors.New("unknown header")
}
td := api.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
if td != nil && td.Cmp(api.les.BlockChain().Config().TerminalTotalDifficulty) < 0 {
return errors.New("invalid ttd")
}
return nil
}
// setCanonical is called to perform a force choice.
func (api *ConsensusAPI) setCanonical(newHead common.Hash) error {
log.Info("Setting head", "head", newHead)
headHeader := api.les.BlockChain().CurrentHeader()
if headHeader.Hash() == newHead {
return nil
}
newHeadHeader := api.les.BlockChain().GetHeaderByHash(newHead)
if newHeadHeader == nil {
return errors.New("unknown header")
}
if err := api.les.BlockChain().SetCanonical(newHeadHeader); err != nil {
return err
}
// Trigger the transition if it's the first `NewHead` event.
if merger := api.les.Merger(); !merger.PoSFinalized() {
merger.FinalizePoS()
}
return nil
}
// ExchangeTransitionConfigurationV1 checks the given configuration against
// the configuration of the node.
func (api *ConsensusAPI) ExchangeTransitionConfigurationV1(config engine.TransitionConfigurationV1) (*engine.TransitionConfigurationV1, error) {
log.Trace("Engine API request received", "method", "ExchangeTransitionConfiguration", "ttd", config.TerminalTotalDifficulty)
if config.TerminalTotalDifficulty == nil {
return nil, errors.New("invalid terminal total difficulty")
}
ttd := api.les.BlockChain().Config().TerminalTotalDifficulty
if ttd == nil || ttd.Cmp(config.TerminalTotalDifficulty.ToInt()) != 0 {
log.Warn("Invalid TTD configured", "geth", ttd, "beacon", config.TerminalTotalDifficulty)
return nil, fmt.Errorf("invalid ttd: execution %v consensus %v", ttd, config.TerminalTotalDifficulty)
}
if config.TerminalBlockHash != (common.Hash{}) {
if hash := api.les.BlockChain().GetCanonicalHash(uint64(config.TerminalBlockNumber)); hash == config.TerminalBlockHash {
return &engine.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(ttd),
TerminalBlockHash: config.TerminalBlockHash,
TerminalBlockNumber: config.TerminalBlockNumber,
}, nil
}
return nil, errors.New("invalid terminal block hash")
}
return &engine.TransitionConfigurationV1{TerminalTotalDifficulty: (*hexutil.Big)(ttd)}, nil
}
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package catalyst
import (
"math/big"
"testing"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
var (
// testKey is a private key to use for funding a tester account.
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
// testAddr is the Ethereum address of the tester account.
testAddr = crypto.PubkeyToAddress(testKey.PublicKey)
testBalance = big.NewInt(2e18)
)
func generatePreMergeChain(pre, post int) (*core.Genesis, []*types.Header, []*types.Block, []*types.Header, []*types.Block) {
config := *params.AllEthashProtocolChanges
genesis := &core.Genesis{
Config: &config,
Alloc: core.GenesisAlloc{testAddr: {Balance: testBalance}},
ExtraData: []byte("test genesis"),
Timestamp: 9000,
BaseFee: big.NewInt(params.InitialBaseFee),
}
// Pre-merge blocks
db, preBLocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), pre, nil)
totalDifficulty := new(big.Int).Set(params.GenesisDifficulty)
var preHeaders []*types.Header
for _, b := range preBLocks {
totalDifficulty.Add(totalDifficulty, b.Difficulty())
preHeaders = append(preHeaders, b.Header())
}
config.TerminalTotalDifficulty = totalDifficulty
// Post-merge blocks
postBlocks, _ := core.GenerateChain(genesis.Config,
preBLocks[len(preBLocks)-1], ethash.NewFaker(), db, post,
func(i int, b *core.BlockGen) {
b.SetPoS()
})
var postHeaders []*types.Header
for _, b := range postBlocks {
postHeaders = append(postHeaders, b.Header())
}
return genesis, preHeaders, preBLocks, postHeaders, postBlocks
}
func TestSetHeadBeforeTotalDifficulty(t *testing.T) {
genesis, headers, blocks, _, _ := generatePreMergeChain(10, 0)
n, lesService := startLesService(t, genesis, headers)
defer n.Close()
api := NewConsensusAPI(lesService)
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: blocks[5].Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err == nil {
t.Errorf("fork choice updated before total terminal difficulty should fail")
}
}
func TestExecutePayloadV1(t *testing.T) {
genesis, headers, _, _, postBlocks := generatePreMergeChain(10, 2)
n, lesService := startLesService(t, genesis, headers)
lesService.Merger().ReachTTD()
defer n.Close()
api := NewConsensusAPI(lesService)
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: postBlocks[0].Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
t.Errorf("Failed to update head %v", err)
}
block := postBlocks[0]
fakeBlock := types.NewBlock(&types.Header{
ParentHash: block.ParentHash(),
UncleHash: crypto.Keccak256Hash(nil),
Coinbase: block.Coinbase(),
Root: block.Root(),
TxHash: crypto.Keccak256Hash(nil),
ReceiptHash: crypto.Keccak256Hash(nil),
Bloom: block.Bloom(),
Difficulty: big.NewInt(0),
Number: block.Number(),
GasLimit: block.GasLimit(),
GasUsed: block.GasUsed(),
Time: block.Time(),
Extra: block.Extra(),
MixDigest: block.MixDigest(),
Nonce: types.BlockNonce{},
BaseFee: block.BaseFee(),
}, nil, nil, nil, trie.NewStackTrie(nil))
_, err := api.ExecutePayloadV1(engine.ExecutableData{
ParentHash: fakeBlock.ParentHash(),
FeeRecipient: fakeBlock.Coinbase(),
StateRoot: fakeBlock.Root(),
ReceiptsRoot: fakeBlock.ReceiptHash(),
LogsBloom: fakeBlock.Bloom().Bytes(),
Random: fakeBlock.MixDigest(),
Number: fakeBlock.NumberU64(),
GasLimit: fakeBlock.GasLimit(),
GasUsed: fakeBlock.GasUsed(),
Timestamp: fakeBlock.Time(),
ExtraData: fakeBlock.Extra(),
BaseFeePerGas: fakeBlock.BaseFee(),
BlockHash: fakeBlock.Hash(),
Transactions: encodeTransactions(fakeBlock.Transactions()),
})
if err != nil {
t.Errorf("Failed to execute payload %v", err)
}
headHeader := api.les.BlockChain().CurrentHeader()
if headHeader.Number.Uint64() != fakeBlock.NumberU64()-1 {
t.Fatal("Unexpected chain head update")
}
fcState = engine.ForkchoiceStateV1{
HeadBlockHash: fakeBlock.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
t.Fatal("Failed to update head")
}
headHeader = api.les.BlockChain().CurrentHeader()
if headHeader.Number.Uint64() != fakeBlock.NumberU64() {
t.Fatal("Failed to update chain head")
}
}
func TestEth2DeepReorg(t *testing.T) {
// TODO (MariusVanDerWijden) TestEth2DeepReorg is currently broken, because it tries to reorg
// before the totalTerminalDifficulty threshold
/*
genesis, preMergeBlocks := generatePreMergeChain(core.TriesInMemory * 2)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close()
var (
api = NewConsensusAPI(ethservice, nil)
parent = preMergeBlocks[len(preMergeBlocks)-core.TriesInMemory-1]
head = ethservice.BlockChain().CurrentBlock().NumberU64()
)
if ethservice.BlockChain().HasBlockAndState(parent.Hash(), parent.NumberU64()) {
t.Errorf("Block %d not pruned", parent.NumberU64())
}
for i := 0; i < 10; i++ {
execData, err := api.assembleBlock(AssembleBlockParams{
ParentHash: parent.Hash(),
Timestamp: parent.Time() + 5,
})
if err != nil {
t.Fatalf("Failed to create the executable data %v", err)
}
block, err := ExecutableDataToBlock(ethservice.BlockChain().Config(), parent.Header(), *execData)
if err != nil {
t.Fatalf("Failed to convert executable data to block %v", err)
}
newResp, err := api.ExecutePayload(*execData)
if err != nil || newResp.Status != "VALID" {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != head {
t.Fatalf("Chain head shouldn't be updated")
}
if err := api.setCanonical(block.Hash()); err != nil {
t.Fatalf("Failed to set head: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() {
t.Fatalf("Chain head should be updated")
}
parent, head = block, block.NumberU64()
}
*/
}
// startEthService creates a full node instance for testing.
func startLesService(t *testing.T, genesis *core.Genesis, headers []*types.Header) (*node.Node, *les.LightEthereum) {
t.Helper()
n, err := node.New(&node.Config{})
if err != nil {
t.Fatal("can't create node:", err)
}
ethcfg := &ethconfig.Config{
Genesis: genesis,
SyncMode: downloader.LightSync,
TrieDirtyCache: 256,
TrieCleanCache: 256,
LightPeers: 10,
}
lesService, err := les.New(n, ethcfg)
if err != nil {
t.Fatal("can't create eth service:", err)
}
if err := n.Start(); err != nil {
t.Fatal("can't start node:", err)
}
if _, err := lesService.BlockChain().InsertHeaderChain(headers); err != nil {
n.Close()
t.Fatal("can't import test headers:", err)
}
return n, lesService
}
func encodeTransactions(txs []*types.Transaction) [][]byte {
var enc = make([][]byte, len(txs))
for i, tx := range txs {
enc[i], _ = tx.MarshalBinary()
}
return enc
}
...@@ -36,7 +36,6 @@ import ( ...@@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/shutdowncheck" "github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/les/downloader"
"github.com/ethereum/go-ethereum/les/vflux" "github.com/ethereum/go-ethereum/les/vflux"
vfc "github.com/ethereum/go-ethereum/les/vflux/client" vfc "github.com/ethereum/go-ethereum/les/vflux/client"
"github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/light"
...@@ -64,7 +63,6 @@ type LightEthereum struct { ...@@ -64,7 +63,6 @@ type LightEthereum struct {
blockchain *light.LightChain blockchain *light.LightChain
serverPool *vfc.ServerPool serverPool *vfc.ServerPool
serverPoolIterator enode.Iterator serverPoolIterator enode.Iterator
pruner *pruner
merger *consensus.Merger merger *consensus.Merger
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
...@@ -146,7 +144,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { ...@@ -146,7 +144,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
if leth.udpEnabled { if leth.udpEnabled {
prenegQuery = leth.prenegQuery prenegQuery = leth.prenegQuery
} }
leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList) leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, nil, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter) leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
...@@ -170,9 +168,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { ...@@ -170,9 +168,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
leth.chtIndexer.Start(leth.blockchain) leth.chtIndexer.Start(leth.blockchain)
leth.bloomIndexer.Start(leth.blockchain) leth.bloomIndexer.Start(leth.blockchain)
// Start a light chain pruner to delete useless historical data.
leth.pruner = newPruner(chainDb, leth.chtIndexer, leth.bloomTrieIndexer)
// Rewind the chain in case of an incompatible config upgrade. // Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok { if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat) log.Warn("Rewinding chain to upgrade configuration", "err", compat)
...@@ -191,7 +186,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { ...@@ -191,7 +186,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
} }
leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams) leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams)
leth.handler = newClientHandler(config.UltraLightServers, config.UltraLightFraction, leth) leth.handler = newClientHandler(leth)
leth.netRPCService = ethapi.NewNetAPI(leth.p2pServer, leth.config.NetworkId) leth.netRPCService = ethapi.NewNetAPI(leth.p2pServer, leth.config.NetworkId)
// Register the backend on the node // Register the backend on the node
...@@ -298,9 +293,6 @@ func (s *LightEthereum) APIs() []rpc.API { ...@@ -298,9 +293,6 @@ func (s *LightEthereum) APIs() []rpc.API {
{ {
Namespace: "eth", Namespace: "eth",
Service: &LightDummyAPI{}, Service: &LightDummyAPI{},
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
}, { }, {
Namespace: "net", Namespace: "net",
Service: s.netRPCService, Service: s.netRPCService,
...@@ -315,13 +307,12 @@ func (s *LightEthereum) ResetWithGenesisBlock(gb *types.Block) { ...@@ -315,13 +307,12 @@ func (s *LightEthereum) ResetWithGenesisBlock(gb *types.Block) {
s.blockchain.ResetWithGenesisBlock(gb) s.blockchain.ResetWithGenesisBlock(gb)
} }
func (s *LightEthereum) BlockChain() *light.LightChain { return s.blockchain } func (s *LightEthereum) BlockChain() *light.LightChain { return s.blockchain }
func (s *LightEthereum) TxPool() *light.TxPool { return s.txPool } func (s *LightEthereum) TxPool() *light.TxPool { return s.txPool }
func (s *LightEthereum) Engine() consensus.Engine { return s.engine } func (s *LightEthereum) Engine() consensus.Engine { return s.engine }
func (s *LightEthereum) LesVersion() int { return int(ClientProtocolVersions[0]) } func (s *LightEthereum) LesVersion() int { return int(ClientProtocolVersions[0]) }
func (s *LightEthereum) Downloader() *downloader.Downloader { return s.handler.downloader } func (s *LightEthereum) EventMux() *event.TypeMux { return s.eventMux }
func (s *LightEthereum) EventMux() *event.TypeMux { return s.eventMux } func (s *LightEthereum) Merger() *consensus.Merger { return s.merger }
func (s *LightEthereum) Merger() *consensus.Merger { return s.merger }
// Protocols returns all the currently configured network protocols to start. // Protocols returns all the currently configured network protocols to start.
func (s *LightEthereum) Protocols() []p2p.Protocol { func (s *LightEthereum) Protocols() []p2p.Protocol {
...@@ -354,7 +345,6 @@ func (s *LightEthereum) Start() error { ...@@ -354,7 +345,6 @@ func (s *LightEthereum) Start() error {
// Start bloom request workers. // Start bloom request workers.
s.wg.Add(bloomServiceThreads) s.wg.Add(bloomServiceThreads)
s.startBloomHandlers(params.BloomBitsBlocksClient) s.startBloomHandlers(params.BloomBitsBlocksClient)
s.handler.start()
return nil return nil
} }
...@@ -374,7 +364,6 @@ func (s *LightEthereum) Stop() error { ...@@ -374,7 +364,6 @@ func (s *LightEthereum) Stop() error {
s.handler.stop() s.handler.stop()
s.txPool.Stop() s.txPool.Stop()
s.engine.Close() s.engine.Close()
s.pruner.close()
s.eventMux.Stop() s.eventMux.Stop()
// Clean shutdown marker as the last thing before closing db // Clean shutdown marker as the last thing before closing db
s.shutdownTracker.Stop() s.shutdownTracker.Stop()
......
...@@ -17,9 +17,6 @@ ...@@ -17,9 +17,6 @@
package les package les
import ( import (
"context"
"math/big"
"math/rand"
"sync" "sync"
"time" "time"
...@@ -27,68 +24,37 @@ import ( ...@@ -27,68 +24,37 @@ import (
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/les/downloader"
"github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
) )
// clientHandler is responsible for receiving and processing all incoming server // clientHandler is responsible for receiving and processing all incoming server
// responses. // responses.
type clientHandler struct { type clientHandler struct {
ulc *ulc
forkFilter forkid.Filter forkFilter forkid.Filter
fetcher *lightFetcher
downloader *downloader.Downloader
backend *LightEthereum backend *LightEthereum
closeCh chan struct{} closeCh chan struct{}
wg sync.WaitGroup // WaitGroup used to track all connected peers. wg sync.WaitGroup // WaitGroup used to track all connected peers.
// Hooks used in the testing
syncStart func(header *types.Header) // Hook called when the syncing is started
syncEnd func(header *types.Header) // Hook called when the syncing is done
} }
func newClientHandler(ulcServers []string, ulcFraction int, backend *LightEthereum) *clientHandler { func newClientHandler(backend *LightEthereum) *clientHandler {
handler := &clientHandler{ handler := &clientHandler{
forkFilter: forkid.NewFilter(backend.blockchain), forkFilter: forkid.NewFilter(backend.blockchain),
backend: backend, backend: backend,
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
} }
if ulcServers != nil {
ulc, err := newULC(ulcServers, ulcFraction)
if err != nil {
log.Error("Failed to initialize ultra light client")
}
handler.ulc = ulc
log.Info("Enable ultra light client mode")
}
handler.fetcher = newLightFetcher(backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise)
handler.downloader = downloader.New(0, backend.chainDb, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler return handler
} }
func (h *clientHandler) start() {
h.fetcher.start()
}
func (h *clientHandler) stop() { func (h *clientHandler) stop() {
close(h.closeCh) close(h.closeCh)
h.downloader.Terminate()
h.fetcher.stop()
h.wg.Wait() h.wg.Wait()
} }
// runPeer is the p2p protocol run function for the given version. // runPeer is the p2p protocol run function for the given version.
func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error { func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
trusted := false peer := newServerPeer(int(version), h.backend.config.NetworkId, false, p, newMeteredMsgWriter(rw, int(version)))
if h.ulc != nil {
trusted = h.ulc.trusted(p.ID())
}
peer := newServerPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version)))
defer peer.close() defer peer.close()
h.wg.Add(1) h.wg.Add(1)
defer h.wg.Done() defer h.wg.Done()
...@@ -136,12 +102,6 @@ func (h *clientHandler) handle(p *serverPeer, noInitAnnounce bool) error { ...@@ -136,12 +102,6 @@ func (h *clientHandler) handle(p *serverPeer, noInitAnnounce bool) error {
serverConnectionGauge.Update(int64(h.backend.peers.len())) serverConnectionGauge.Update(int64(h.backend.peers.len()))
}() }()
// Discard all the announces after the transition
// Also discarding initial signal to prevent syncing during testing.
if !(noInitAnnounce || h.backend.merger.TDDReached()) {
h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td})
}
// Mark the peer starts to be served. // Mark the peer starts to be served.
p.serving.Store(true) p.serving.Store(true)
defer p.serving.Store(false) defer p.serving.Store(false)
...@@ -206,11 +166,6 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { ...@@ -206,11 +166,6 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
// Update peer head information first and then notify the announcement // Update peer head information first and then notify the announcement
p.updateHead(req.Hash, req.Number, req.Td) p.updateHead(req.Hash, req.Number, req.Td)
// Discard all the announces after the transition
if !h.backend.merger.TDDReached() {
h.fetcher.announce(p, &req)
}
} }
case msg.Code == BlockHeadersMsg: case msg.Code == BlockHeadersMsg:
p.Log().Trace("Received block header response message") p.Log().Trace("Received block header response message")
...@@ -221,28 +176,13 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { ...@@ -221,28 +176,13 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
if err := msg.Decode(&resp); err != nil { if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err) return errResp(ErrDecode, "msg %v: %v", msg, err)
} }
headers := resp.Headers
p.fcServer.ReceivedReply(resp.ReqID, resp.BV) p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID) p.answeredRequest(resp.ReqID)
// Filter out the explicitly requested header by the retriever deliverMsg = &Msg{
if h.backend.retriever.requested(resp.ReqID) { MsgType: MsgBlockHeaders,
deliverMsg = &Msg{ ReqID: resp.ReqID,
MsgType: MsgBlockHeaders, Obj: resp.Headers,
ReqID: resp.ReqID,
Obj: resp.Headers,
}
} else {
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
headers = h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
}
if len(headers) != 0 || !filter {
if err := h.downloader.DeliverHeaders(p.id, headers); err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}
} }
case msg.Code == BlockBodiesMsg: case msg.Code == BlockBodiesMsg:
p.Log().Trace("Received block bodies response") p.Log().Trace("Received block bodies response")
...@@ -366,117 +306,3 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { ...@@ -366,117 +306,3 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
} }
return nil return nil
} }
func (h *clientHandler) removePeer(id string) {
h.backend.peers.unregister(id)
}
type peerConnection struct {
handler *clientHandler
peer *serverPeer
}
func (pc *peerConnection) Head() (common.Hash, *big.Int) {
return pc.peer.HeadAndTd()
}
func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*serverPeer)
return peer.getRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*serverPeer) == pc.peer
},
request: func(dp distPeer) func() {
reqID := rand.Uint64()
peer := dp.(*serverPeer)
cost := peer.getRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.requestHeadersByHash(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
if !ok {
return light.ErrNoPeers
}
return nil
}
func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*serverPeer)
return peer.getRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*serverPeer) == pc.peer
},
request: func(dp distPeer) func() {
reqID := rand.Uint64()
peer := dp.(*serverPeer)
cost := peer.getRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.requestHeadersByNumber(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
if !ok {
return light.ErrNoPeers
}
return nil
}
// RetrieveSingleHeaderByNumber requests a single header by the specified block
// number. This function will wait the response until it's timeout or delivered.
func (pc *peerConnection) RetrieveSingleHeaderByNumber(context context.Context, number uint64) (*types.Header, error) {
reqID := rand.Uint64()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*serverPeer)
return peer.getRequestCost(GetBlockHeadersMsg, 1)
},
canSend: func(dp distPeer) bool {
return dp.(*serverPeer) == pc.peer
},
request: func(dp distPeer) func() {
peer := dp.(*serverPeer)
cost := peer.getRequestCost(GetBlockHeadersMsg, 1)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.requestHeadersByNumber(reqID, number, 1, 0, false) }
},
}
var header *types.Header
if err := pc.handler.backend.retriever.retrieve(context, reqID, rq, func(peer distPeer, msg *Msg) error {
if msg.MsgType != MsgBlockHeaders {
return errInvalidMessageType
}
headers := msg.Obj.([]*types.Header)
if len(headers) != 1 {
return errInvalidEntryCount
}
header = headers[0]
return nil
}, nil); err != nil {
return nil, err
}
return header, nil
}
// downloaderPeerNotify implements peerSetNotify
type downloaderPeerNotify clientHandler
func (d *downloaderPeerNotify) registerPeer(p *serverPeer) {
h := (*clientHandler)(d)
pc := &peerConnection{
handler: h,
peer: p,
}
h.downloader.RegisterLightPeer(p.id, eth.ETH66, pc)
}
func (d *downloaderPeerNotify) unregisterPeer(p *serverPeer) {
h := (*clientHandler)(d)
h.downloader.UnregisterPeer(p.id)
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"context"
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// DownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type DownloaderAPI struct {
d *Downloader
mux *event.TypeMux
installSyncSubscription chan chan interface{}
uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
}
// NewDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
// listens for events from the downloader through the global event mux. In case it receives one of
// these events it broadcasts it to all syncing subscriptions that are installed through the
// installSyncSubscription channel.
func NewDownloaderAPI(d *Downloader, m *event.TypeMux) *DownloaderAPI {
api := &DownloaderAPI{
d: d,
mux: m,
installSyncSubscription: make(chan chan interface{}),
uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
}
go api.eventLoop()
return api
}
// eventLoop runs a loop until the event mux closes. It will install and uninstall new
// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
func (api *DownloaderAPI) eventLoop() {
var (
sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
syncSubscriptions = make(map[chan interface{}]struct{})
)
for {
select {
case i := <-api.installSyncSubscription:
syncSubscriptions[i] = struct{}{}
case u := <-api.uninstallSyncSubscription:
delete(syncSubscriptions, u.c)
close(u.uninstalled)
case event := <-sub.Chan():
if event == nil {
return
}
var notification interface{}
switch event.Data.(type) {
case StartEvent:
notification = &SyncingResult{
Syncing: true,
Status: api.d.Progress(),
}
case DoneEvent, FailedEvent:
notification = false
}
// broadcast
for c := range syncSubscriptions {
c <- notification
}
}
}
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
func (api *DownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
statuses := make(chan interface{})
sub := api.SubscribeSyncStatus(statuses)
for {
select {
case status := <-statuses:
notifier.Notify(rpcSub.ID, status)
case <-rpcSub.Err():
sub.Unsubscribe()
return
case <-notifier.Closed():
sub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// SyncingResult provides information about the current synchronisation status for this node.
type SyncingResult struct {
Syncing bool `json:"syncing"`
Status ethereum.SyncProgress `json:"status"`
}
// uninstallSyncSubscriptionRequest uninstalls a syncing subscription in the API event loop.
type uninstallSyncSubscriptionRequest struct {
c chan interface{}
uninstalled chan interface{}
}
// SyncStatusSubscription represents a syncing subscription.
type SyncStatusSubscription struct {
api *DownloaderAPI // register subscription in event loop of this api instance
c chan interface{} // channel where events are broadcasted to
unsubOnce sync.Once // make sure unsubscribe logic is executed once
}
// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
// The status channel that was passed to subscribeSyncStatus isn't used anymore
// after this method returns.
func (s *SyncStatusSubscription) Unsubscribe() {
s.unsubOnce.Do(func() {
req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
s.api.uninstallSyncSubscription <- &req
for {
select {
case <-s.c:
// drop new status events until uninstall confirmation
continue
case <-req.uninstalled:
return
}
}
})
}
// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
// The given channel must receive interface values, the result can either
func (api *DownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
api.installSyncSubscription <- status
return &SyncStatusSubscription{api: api, c: status}
}
This diff is collapsed.
This diff is collapsed.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import "github.com/ethereum/go-ethereum/core/types"
type DoneEvent struct {
Latest *types.Header
}
type StartEvent struct{}
type FailedEvent struct{ Err error }
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains the metrics collected by the downloader.
package downloader
import (
"github.com/ethereum/go-ethereum/metrics"
)
var (
headerInMeter = metrics.NewRegisteredMeter("eth/downloader/headers/in", nil)
headerReqTimer = metrics.NewRegisteredTimer("eth/downloader/headers/req", nil)
headerDropMeter = metrics.NewRegisteredMeter("eth/downloader/headers/drop", nil)
headerTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/headers/timeout", nil)
bodyInMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/in", nil)
bodyReqTimer = metrics.NewRegisteredTimer("eth/downloader/bodies/req", nil)
bodyDropMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/drop", nil)
bodyTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/timeout", nil)
receiptInMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/in", nil)
receiptReqTimer = metrics.NewRegisteredTimer("eth/downloader/receipts/req", nil)
receiptDropMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/drop", nil)
receiptTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/timeout", nil)
stateInMeter = metrics.NewRegisteredMeter("eth/downloader/states/in", nil)
stateDropMeter = metrics.NewRegisteredMeter("eth/downloader/states/drop", nil)
throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil)
)
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import "fmt"
// SyncMode represents the synchronisation mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32
const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
FastSync // Quickly download the headers, full sync only at the chain
SnapSync // Download the chain and the state via compact snapshots
LightSync // Download only the headers and terminate afterwards
)
func (mode SyncMode) IsValid() bool {
return mode >= FullSync && mode <= LightSync
}
// String implements the stringer interface.
func (mode SyncMode) String() string {
switch mode {
case FullSync:
return "full"
case FastSync:
return "fast"
case SnapSync:
return "snap"
case LightSync:
return "light"
default:
return "unknown"
}
}
func (mode SyncMode) MarshalText() ([]byte, error) {
switch mode {
case FullSync:
return []byte("full"), nil
case FastSync:
return []byte("fast"), nil
case SnapSync:
return []byte("snap"), nil
case LightSync:
return []byte("light"), nil
default:
return nil, fmt.Errorf("unknown sync mode %d", mode)
}
}
func (mode *SyncMode) UnmarshalText(text []byte) error {
switch string(text) {
case "full":
*mode = FullSync
case "fast":
*mode = FastSync
case "snap":
*mode = SnapSync
case "light":
*mode = LightSync
default:
return fmt.Errorf(`unknown sync mode %q, want "full", "fast" or "light"`, text)
}
return nil
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"github.com/ethereum/go-ethereum/core/types"
)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
// dataPack is a data message returned by a peer for some query.
type dataPack interface {
PeerId() string
Items() int
Stats() string
}
// headerPack is a batch of block headers returned by a peer.
type headerPack struct {
peerID string
headers []*types.Header
}
func (p *headerPack) PeerId() string { return p.peerID }
func (p *headerPack) Items() int { return len(p.headers) }
func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
// bodyPack is a batch of block bodies returned by a peer.
type bodyPack struct {
peerID string
transactions [][]*types.Transaction
uncles [][]*types.Header
}
func (p *bodyPack) PeerId() string { return p.peerID }
func (p *bodyPack) Items() int {
if len(p.transactions) <= len(p.uncles) {
return len(p.transactions)
}
return len(p.uncles)
}
func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
// receiptPack is a batch of receipts returned by a peer.
type receiptPack struct {
peerID string
receipts [][]*types.Receipt
}
func (p *receiptPack) PeerId() string { return p.peerID }
func (p *receiptPack) Items() int { return len(p.receipts) }
func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
// statePack is a batch of states returned by a peer.
type statePack struct {
peerID string
states [][]byte
}
func (p *statePack) PeerId() string { return p.peerID }
func (p *statePack) Items() int { return len(p.states) }
func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) }
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -31,7 +31,7 @@ import ( ...@@ -31,7 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/les/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
......
...@@ -16,6 +16,10 @@ ...@@ -16,6 +16,10 @@
package les package les
// Note: these tests are disabled now because they cannot work with the old sync
// mechanism removed but will be useful again once the PoS ultralight mode is implemented
/*
import ( import (
"bytes" "bytes"
"context" "context"
...@@ -451,3 +455,4 @@ func randomHash() common.Hash { ...@@ -451,3 +455,4 @@ func randomHash() common.Hash {
} }
return hash return hash
} }
*/
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment