swarm.go 16.2 KB
Newer Older
1
// Copyright 2018 The go-ethereum Authors
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package swarm

import (
	"bytes"
21
	"context"
22
	"crypto/ecdsa"
23
	"errors"
24
	"fmt"
25
	"io"
26
	"math/big"
27
	"net"
28
	"path/filepath"
29
	"strings"
30
	"time"
31
	"unicode"
32

33 34 35 36 37
	"github.com/ethereum/go-ethereum/swarm/chunk"

	"github.com/ethereum/go-ethereum/swarm/storage/feed"
	"github.com/ethereum/go-ethereum/swarm/storage/localstore"

38 39 40 41
	"github.com/ethereum/go-ethereum/accounts/abi/bind"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/contracts/chequebook"
	"github.com/ethereum/go-ethereum/contracts/ens"
42
	"github.com/ethereum/go-ethereum/ethclient"
43
	"github.com/ethereum/go-ethereum/metrics"
44
	"github.com/ethereum/go-ethereum/p2p"
45
	"github.com/ethereum/go-ethereum/p2p/protocols"
46
	"github.com/ethereum/go-ethereum/params"
47 48 49
	"github.com/ethereum/go-ethereum/rpc"
	"github.com/ethereum/go-ethereum/swarm/api"
	httpapi "github.com/ethereum/go-ethereum/swarm/api/http"
50
	"github.com/ethereum/go-ethereum/swarm/fuse"
51
	"github.com/ethereum/go-ethereum/swarm/log"
52
	"github.com/ethereum/go-ethereum/swarm/network"
53 54 55
	"github.com/ethereum/go-ethereum/swarm/network/stream"
	"github.com/ethereum/go-ethereum/swarm/pss"
	"github.com/ethereum/go-ethereum/swarm/state"
56
	"github.com/ethereum/go-ethereum/swarm/storage"
57
	"github.com/ethereum/go-ethereum/swarm/storage/mock"
58
	"github.com/ethereum/go-ethereum/swarm/swap"
59
	"github.com/ethereum/go-ethereum/swarm/tracing"
60 61
)

62 63 64 65 66
var (
	updateGaugesPeriod = 5 * time.Second
	startCounter       = metrics.NewRegisteredCounter("stack,start", nil)
	stopCounter        = metrics.NewRegisteredCounter("stack,stop", nil)
	uptimeGauge        = metrics.NewRegisteredGauge("stack.uptime", nil)
67
	requestsCacheGauge = metrics.NewRegisteredGauge("storage.cache.requests.size", nil)
68 69
)

70 71
// the swarm stack
type Swarm struct {
72 73 74 75 76 77 78 79 80 81 82 83 84 85
	config            *api.Config        // swarm configuration
	api               *api.API           // high level api layer (fs/manifest)
	dns               api.Resolver       // DNS registrar
	fileStore         *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support
	streamer          *stream.Registry
	bzz               *network.Bzz       // the logistic manager
	backend           chequebook.Backend // simple blockchain Backend
	privateKey        *ecdsa.PrivateKey
	netStore          *storage.NetStore
	sfs               *fuse.SwarmFS // need this to cleanup all the active mounts on node exit
	ps                *pss.Pss
	swap              *swap.Swap
	stateStore        *state.DBStore
	accountingMetrics *protocols.AccountingMetrics
86
	cleanupFuncs      []func() error
87 88

	tracerClose io.Closer
89 90
}

91
// NewSwarm creates a new swarm service instance
92
// implements node.Service
93 94 95 96
// If mockStore is not nil, it will be used as the storage for chunk data.
// MockStore should be used only for testing.
func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err error) {
	if bytes.Equal(common.FromHex(config.PublicKey), storage.ZeroAddr) {
97 98
		return nil, fmt.Errorf("empty public key")
	}
99
	if bytes.Equal(common.FromHex(config.BzzKey), storage.ZeroAddr) {
100 101 102
		return nil, fmt.Errorf("empty bzz key")
	}

103 104 105 106 107 108 109
	var backend chequebook.Backend
	if config.SwapAPI != "" && config.SwapEnabled {
		log.Info("connecting to SWAP API", "url", config.SwapAPI)
		backend, err = ethclient.Dial(config.SwapAPI)
		if err != nil {
			return nil, fmt.Errorf("error connecting to SWAP API %s: %s", config.SwapAPI, err)
		}
110 111
	}

112
	self = &Swarm{
113 114 115 116
		config:       config,
		backend:      backend,
		privateKey:   config.ShiftPrivateKey(),
		cleanupFuncs: []func() error{},
117
	}
118
	log.Debug("Setting up Swarm service components")
119

120
	config.HiveParams.Discovery = true
121

122
	bzzconfig := &network.BzzConfig{
123 124 125 126 127
		NetworkID:    config.NetworkID,
		OverlayAddr:  common.FromHex(config.BzzKey),
		HiveParams:   config.HiveParams,
		LightNode:    config.LightNodeEnabled,
		BootnodeMode: config.BootnodeMode,
128
	}
129

130
	self.stateStore, err = state.NewDBStore(filepath.Join(config.Path, "state-store.db"))
131 132 133
	if err != nil {
		return
	}
134

135 136
	// set up high level api
	var resolver *api.MultiResolver
137 138 139 140
	if len(config.EnsAPIs) > 0 {
		opts := []api.MultiResolverOption{}
		for _, c := range config.EnsAPIs {
			tld, endpoint, addr := parseEnsAPIAddress(c)
141
			r, err := newEnsClient(endpoint, addr, config, self.privateKey)
142 143 144
			if err != nil {
				return nil, err
			}
145
			opts = append(opts, api.MultiResolverOptionWithResolver(r, tld))
146

147
		}
148 149 150
		resolver = api.NewMultiResolver(opts...)
		self.dns = resolver
	}
151 152 153
	// check that we are not in the old database schema
	// if so - fail and exit
	isLegacy := localstore.IsLegacyDatabase(config.ChunkDbPath)
154

155 156 157 158 159 160 161 162 163 164 165 166 167
	if isLegacy {
		return nil, errors.New("Legacy database format detected! Please read the migration announcement at: https://github.com/ethersphere/go-ethereum/wiki/Swarm-v0.4-local-store-migration")
	}

	var feedsHandler *feed.Handler
	fhParams := &feed.HandlerParams{}

	feedsHandler = feed.NewHandler(fhParams)

	localStore, err := localstore.New(config.ChunkDbPath, config.BaseKey, &localstore.Options{
		MockStore: mockStore,
		Capacity:  config.DbCapacity,
	})
168
	if err != nil {
169 170
		return nil, err
	}
171 172 173 174 175
	lstore := chunk.NewValidatorStore(
		localStore,
		storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash)),
		feedsHandler,
	)
176 177 178 179

	self.netStore, err = storage.NewNetStore(lstore, nil)
	if err != nil {
		return nil, err
180 181 182 183 184 185
	}

	to := network.NewKademlia(
		common.FromHex(config.BzzKey),
		network.NewKadParams(),
	)
186 187
	delivery := stream.NewDelivery(to, self.netStore)
	self.netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, config.DeliverySkipCheck).New
188

189 190
	feedsHandler.SetStore(self.netStore)

191 192 193 194 195 196
	if config.SwapEnabled {
		balancesStore, err := state.NewDBStore(filepath.Join(config.Path, "balances.db"))
		if err != nil {
			return nil, err
		}
		self.swap = swap.New(balancesStore)
197
		self.accountingMetrics = protocols.SetupAccountingMetrics(10*time.Second, filepath.Join(config.Path, "metrics.db"))
198 199
	}

200
	nodeID := config.Enode.ID()
201 202 203 204 205 206

	syncing := stream.SyncingAutoSubscribe
	if !config.SyncEnabled || config.LightNodeEnabled {
		syncing = stream.SyncingDisabled
	}

207
	registryOptions := &stream.RegistryOptions{
208
		SkipCheck:       config.DeliverySkipCheck,
209
		Syncing:         syncing,
210
		SyncUpdateDelay: config.SyncUpdateDelay,
211
		MaxPeerServers:  config.MaxStreamPeerServers,
212
	}
213
	self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap)
214
	tags := chunk.NewTags() //todo load from state store
215 216

	// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
217
	self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams, tags)
218

219
	log.Debug("Setup local storage")
220

221
	self.bzz = network.NewBzz(bzzconfig, to, self.stateStore, self.streamer.GetSpec(), self.streamer.Run)
222 223 224 225 226 227 228 229

	// Pss = postal service over swarm (devp2p over bzz)
	self.ps, err = pss.NewPss(to, config.Pss)
	if err != nil {
		return nil, err
	}
	if pss.IsActiveHandshake {
		pss.SetHandshakeController(self.ps, pss.NewHandshakeParams())
230 231
	}

232
	self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, tags)
233

234
	self.sfs = fuse.NewSwarmFS(self.api)
235
	log.Debug("Initialized FUSE filesystem")
236

237 238 239
	return self, nil
}

240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
// parseEnsAPIAddress parses string according to format
// [tld:][contract-addr@]url and returns ENSClientConfig structure
// with endpoint, contract address and TLD.
func parseEnsAPIAddress(s string) (tld, endpoint string, addr common.Address) {
	isAllLetterString := func(s string) bool {
		for _, r := range s {
			if !unicode.IsLetter(r) {
				return false
			}
		}
		return true
	}
	endpoint = s
	if i := strings.Index(endpoint, ":"); i > 0 {
		if isAllLetterString(endpoint[:i]) && len(endpoint) > i+2 && endpoint[i+1:i+3] != "//" {
			tld = endpoint[:i]
			endpoint = endpoint[i+1:]
		}
	}
	if i := strings.Index(endpoint, "@"); i > 0 {
		addr = common.HexToAddress(endpoint[:i])
		endpoint = endpoint[i+1:]
	}
	return
264 265
}

266 267 268 269 270 271
// ensClient provides functionality for api.ResolveValidator
type ensClient struct {
	*ens.ENS
	*ethclient.Client
}

272 273 274
// newEnsClient creates a new ENS client for that is a consumer of
// a ENS API on a specific endpoint. It is used as a helper function
// for creating multiple resolvers in NewSwarm function.
275
func newEnsClient(endpoint string, addr common.Address, config *api.Config, privkey *ecdsa.PrivateKey) (*ensClient, error) {
276 277 278 279 280
	log.Info("connecting to ENS API", "url", endpoint)
	client, err := rpc.Dial(endpoint)
	if err != nil {
		return nil, fmt.Errorf("error connecting to ENS API %s: %s", endpoint, err)
	}
281
	ethClient := ethclient.NewClient(client)
282 283

	ensRoot := config.EnsRoot
284 285
	if addr != (common.Address{}) {
		ensRoot = addr
286 287 288 289 290 291 292 293
	} else {
		a, err := detectEnsAddr(client)
		if err == nil {
			ensRoot = a
		} else {
			log.Warn(fmt.Sprintf("could not determine ENS contract address, using default %s", ensRoot), "err", err)
		}
	}
294 295
	transactOpts := bind.NewKeyedTransactor(privkey)
	dns, err := ens.NewENS(transactOpts, ensRoot, ethClient)
296 297 298 299
	if err != nil {
		return nil, err
	}
	log.Debug(fmt.Sprintf("-> Swarm Domain Name Registrar %v @ address %v", endpoint, ensRoot.Hex()))
300 301 302 303
	return &ensClient{
		ENS:    dns,
		Client: ethClient,
	}, err
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
}

// detectEnsAddr determines the ENS contract address by getting both the
// version and genesis hash using the client and matching them to either
// mainnet or testnet addresses
func detectEnsAddr(client *rpc.Client) (common.Address, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var version string
	if err := client.CallContext(ctx, &version, "net_version"); err != nil {
		return common.Address{}, err
	}

	block, err := ethclient.NewClient(client).BlockByNumber(ctx, big.NewInt(0))
	if err != nil {
		return common.Address{}, err
	}

	switch {

	case version == "1" && block.Hash() == params.MainnetGenesisHash:
		log.Info("using Mainnet ENS contract address", "addr", ens.MainNetAddress)
		return ens.MainNetAddress, nil

	case version == "3" && block.Hash() == params.TestnetGenesisHash:
		log.Info("using Testnet ENS contract address", "addr", ens.TestNetAddress)
		return ens.TestNetAddress, nil

	default:
		return common.Address{}, fmt.Errorf("unknown version and genesis hash: %s %s", version, block.Hash())
	}
}

338 339 340 341 342 343 344 345 346 347 348
/*
Start is called when the stack is started
* starts the network kademlia hive peer management
* (starts netStore level 0 api)
* starts DPA level 1 api (chunking -> store/retrieve requests)
* (starts level 2 api)
* starts http proxy server
* registers url scheme handlers for bzz, etc
* TODO: start subservices like sword, swear, swarmdns
*/
// implements the node.Service interface
349
func (s *Swarm) Start(srv *p2p.Server) error {
350
	startTime := time.Now()
351

352
	s.tracerClose = tracing.Closer
353

354
	// update uaddr to correct enode
355
	newaddr := s.bzz.UpdateLocalAddr([]byte(srv.Self().String()))
356
	log.Info("Updated bzz local addr", "oaddr", fmt.Sprintf("%x", newaddr.OAddr), "uaddr", fmt.Sprintf("%s", newaddr.UAddr))
357
	// set chequebook
358 359
	//TODO: Currently if swap is enabled and no chequebook (or inexistent) contract is provided, the node would crash.
	//Once we integrate back the contracts, this check MUST be revisited
360
	if s.config.SwapEnabled && s.config.SwapAPI != "" {
361
		ctx := context.Background() // The initial setup has no deadline.
362
		err := s.SetChequebook(ctx)
363 364 365
		if err != nil {
			return fmt.Errorf("Unable to set chequebook for SWAP: %v", err)
		}
366
		log.Debug(fmt.Sprintf("-> cheque book for SWAP: %v", s.config.Swap.Chequebook()))
367
	} else {
368
		log.Debug(fmt.Sprintf("SWAP disabled: no cheque book set"))
369 370
	}

371
	log.Info("Starting bzz service")
372

373
	err := s.bzz.Start(srv)
374 375 376 377
	if err != nil {
		log.Error("bzz failed", "err", err)
		return err
	}
378
	log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", s.bzz.Hive.BaseAddr()))
379

380 381
	if s.ps != nil {
		s.ps.Start(srv)
382
	}
383 384

	// start swarm http proxy server
385 386 387
	if s.config.Port != "" {
		addr := net.JoinHostPort(s.config.ListenAddr, s.config.Port)
		server := httpapi.NewServer(s.api, s.config.Cors)
388

389 390
		if s.config.Cors != "" {
			log.Debug("Swarm HTTP proxy CORS headers", "allowedOrigins", s.config.Cors)
391
		}
392

393
		log.Debug("Starting Swarm HTTP proxy", "port", s.config.Port)
394 395 396 397 398 399
		go func() {
			err := server.ListenAndServe(addr)
			if err != nil {
				log.Error("Could not start Swarm HTTP proxy", "err", err.Error())
			}
		}()
400 401
	}

402
	doneC := make(chan struct{})
403

404
	s.cleanupFuncs = append(s.cleanupFuncs, func() error {
405 406 407
		close(doneC)
		return nil
	})
408

409 410 411 412 413
	go func(time.Time) {
		for {
			select {
			case <-time.After(updateGaugesPeriod):
				uptimeGauge.Update(time.Since(startTime).Nanoseconds())
414
				requestsCacheGauge.Update(int64(s.netStore.RequestsCacheLen()))
415 416 417
			case <-doneC:
				return
			}
418
		}
419
	}(startTime)
420

421
	startCounter.Inc(1)
422
	s.streamer.Start(srv)
423
	return nil
424 425
}

426 427
// implements the node.Service interface
// stops all component services.
428 429 430
func (s *Swarm) Stop() error {
	if s.tracerClose != nil {
		err := s.tracerClose.Close()
431
		tracing.FinishSpans()
432 433 434 435 436
		if err != nil {
			return err
		}
	}

437 438
	if s.ps != nil {
		s.ps.Stop()
439
	}
440
	if ch := s.config.Swap.Chequebook(); ch != nil {
441 442 443
		ch.Stop()
		ch.Save()
	}
444 445
	if s.swap != nil {
		s.swap.Close()
446
	}
447 448
	if s.accountingMetrics != nil {
		s.accountingMetrics.Close()
449
	}
450 451
	if s.netStore != nil {
		s.netStore.Close()
452
	}
453
	s.sfs.Stop()
454
	stopCounter.Inc(1)
455
	s.streamer.Stop()
456

457 458 459
	err := s.bzz.Stop()
	if s.stateStore != nil {
		s.stateStore.Close()
460
	}
461

462
	for _, cleanF := range s.cleanupFuncs {
463 464 465 466 467 468
		err = cleanF()
		if err != nil {
			log.Error("encountered an error while running cleanup function", "err", err)
			break
		}
	}
469
	return err
470 471
}

472 473 474 475 476 477
// Protocols implements the node.Service interface
func (s *Swarm) Protocols() (protos []p2p.Protocol) {
	if s.config.BootnodeMode {
		protos = append(protos, s.bzz.Protocols()...)
	} else {
		protos = append(protos, s.bzz.Protocols()...)
478

479 480 481
		if s.ps != nil {
			protos = append(protos, s.ps.Protocols()...)
		}
482 483 484 485
	}
	return
}

486
// implements node.Service
487
// APIs returns the RPC API descriptors the Swarm implementation offers
488
func (s *Swarm) APIs() []rpc.API {
489 490

	apis := []rpc.API{
491 492 493
		// public APIs
		{
			Namespace: "bzz",
494
			Version:   "3.0",
495
			Service:   &Info{s.config, chequebook.ContractParams},
496 497 498 499 500
			Public:    true,
		},
		// admin APIs
		{
			Namespace: "bzz",
501
			Version:   "3.0",
502
			Service:   api.NewInspector(s.api, s.bzz.Hive, s.netStore),
503 504 505 506 507
			Public:    false,
		},
		{
			Namespace: "chequebook",
			Version:   chequebook.Version,
508
			Service:   chequebook.NewAPI(s.config.Swap.Chequebook),
509 510
			Public:    false,
		},
511 512
		{
			Namespace: "swarmfs",
513
			Version:   fuse.SwarmFSVersion,
514
			Service:   s.sfs,
515 516
			Public:    false,
		},
517 518 519
		{
			Namespace: "accounting",
			Version:   protocols.AccountingVersion,
520
			Service:   protocols.NewAccountingApi(s.accountingMetrics),
521 522
			Public:    false,
		},
523
	}
524

525
	apis = append(apis, s.bzz.APIs()...)
526

527 528
	apis = append(apis, s.streamer.APIs()...)

529 530
	if s.ps != nil {
		apis = append(apis, s.ps.APIs()...)
531 532 533
	}

	return apis
534 535 536
}

// SetChequebook ensures that the local checquebook is set up on chain.
537 538
func (s *Swarm) SetChequebook(ctx context.Context) error {
	err := s.config.Swap.SetChequebook(ctx, s.backend, s.config.Path)
539 540 541
	if err != nil {
		return err
	}
542
	log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", s.config.Swap.Contract.Hex()))
543 544 545
	return nil
}

546 547 548 549 550
// RegisterPssProtocol adds a devp2p protocol to the swarm node's Pss instance
func (s *Swarm) RegisterPssProtocol(topic *pss.Topic, spec *protocols.Spec, targetprotocol *p2p.Protocol, options *pss.ProtocolParams) (*pss.Protocol, error) {
	return pss.RegisterProtocol(s.ps, topic, spec, targetprotocol, options)
}

551 552 553 554 555 556
// serialisable info about swarm
type Info struct {
	*api.Config
	*chequebook.Params
}

557 558
func (s *Info) Info() *Info {
	return s
559
}