node.go 19.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package node

import (
	"errors"
21
	"net"
22 23
	"os"
	"path/filepath"
24
	"reflect"
25
	"strings"
26 27 28
	"sync"
	"syscall"

29
	"github.com/ethereum/go-ethereum/accounts"
30
	"github.com/ethereum/go-ethereum/ethdb"
31
	"github.com/ethereum/go-ethereum/event"
32
	"github.com/ethereum/go-ethereum/internal/debug"
33 34
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
35
	"github.com/ethereum/go-ethereum/p2p"
36
	"github.com/ethereum/go-ethereum/rpc"
37
	"github.com/syndtr/goleveldb/leveldb/storage"
38 39 40
)

var (
41 42 43 44
	ErrDatadirUsed    = errors.New("datadir already used")
	ErrNodeStopped    = errors.New("node not started")
	ErrNodeRunning    = errors.New("node already running")
	ErrServiceUnknown = errors.New("unknown service")
45 46 47 48

	datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
)

49
// Node is a container on which services can be registered.
50
type Node struct {
51
	eventmux *event.TypeMux // Event multiplexer used between the services of a stack
52 53
	config   *Config
	accman   *accounts.Manager
54

55 56
	ephemeralKeystore string          // if non-empty, the key directory that will be removed by Stop
	instanceDirLock   storage.Storage // prevents concurrent use of instance directory
57

58
	serverConfig p2p.Config
59
	server       *p2p.Server // Currently running P2P networking layer
60

61 62
	serviceFuncs []ServiceConstructor     // Service constructors (in dependency order)
	services     map[reflect.Type]Service // Currently running services
63

64 65 66
	rpcAPIs       []rpc.API   // List of APIs currently provided by the node
	inprocHandler *rpc.Server // In-process RPC request handler to process the API requests

67 68 69 70
	ipcEndpoint string       // IPC endpoint to listen at (empty = IPC disabled)
	ipcListener net.Listener // IPC RPC listener socket to serve API requests
	ipcHandler  *rpc.Server  // IPC RPC request handler to process the API requests

71 72 73 74 75
	httpEndpoint  string       // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
	httpWhitelist []string     // HTTP RPC modules to allow through this endpoint
	httpListener  net.Listener // HTTP RPC listener socket to server API requests
	httpHandler   *rpc.Server  // HTTP RPC request handler to process the API requests

76 77 78
	wsEndpoint string       // Websocket endpoint (interface + port) to listen at (empty = websocket disabled)
	wsListener net.Listener // Websocket RPC listener socket to server API requests
	wsHandler  *rpc.Server  // Websocket RPC request handler to process the API requests
79

80
	stop chan struct{} // Channel to wait for termination notifications
81 82 83 84 85
	lock sync.RWMutex
}

// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
86 87 88 89
	// Copy config and resolve the datadir so future changes to the current
	// working directory don't affect the node.
	confCopy := *conf
	conf = &confCopy
90
	if conf.DataDir != "" {
91 92
		absdatadir, err := filepath.Abs(conf.DataDir)
		if err != nil {
93 94
			return nil, err
		}
95 96 97 98 99 100 101 102 103
		conf.DataDir = absdatadir
	}
	// Ensure that the instance name doesn't cause weird conflicts with
	// other files in the data directory.
	if strings.ContainsAny(conf.Name, `/\`) {
		return nil, errors.New(`Config.Name must not contain '/' or '\'`)
	}
	if conf.Name == datadirDefaultKeyStore {
		return nil, errors.New(`Config.Name cannot be "` + datadirDefaultKeyStore + `"`)
104
	}
105 106 107 108 109
	if strings.HasSuffix(conf.Name, ".ipc") {
		return nil, errors.New(`Config.Name cannot end in ".ipc"`)
	}
	// Ensure that the AccountManager method works before the node has started.
	// We rely on this in cmd/geth.
110 111 112 113
	am, ephemeralKeystore, err := makeAccountManager(conf)
	if err != nil {
		return nil, err
	}
114 115
	// Note: any interaction with Config that would create/touch files
	// in the data directory or instance directory is delayed until Start.
116
	return &Node{
117 118
		accman:            am,
		ephemeralKeystore: ephemeralKeystore,
119 120 121 122 123 124
		config:            conf,
		serviceFuncs:      []ServiceConstructor{},
		ipcEndpoint:       conf.IPCEndpoint(),
		httpEndpoint:      conf.HTTPEndpoint(),
		wsEndpoint:        conf.WSEndpoint(),
		eventmux:          new(event.TypeMux),
125 126 127
	}, nil
}

128 129 130
// Register injects a new service into the node's stack. The service created by
// the passed constructor must be unique in its type with regard to sibling ones.
func (n *Node) Register(constructor ServiceConstructor) error {
131 132 133
	n.lock.Lock()
	defer n.lock.Unlock()

134
	if n.server != nil {
135 136
		return ErrNodeRunning
	}
137
	n.serviceFuncs = append(n.serviceFuncs, constructor)
138 139 140 141 142 143 144 145 146
	return nil
}

// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
	n.lock.Lock()
	defer n.lock.Unlock()

	// Short circuit if the node's already running
147
	if n.server != nil {
148 149
		return ErrNodeRunning
	}
150 151 152 153 154 155 156
	if err := n.openDataDir(); err != nil {
		return err
	}

	// Initialize the p2p server. This creates the node key and
	// discovery databases.
	n.serverConfig = p2p.Config{
157 158 159 160 161 162 163 164 165 166 167
		PrivateKey:       n.config.NodeKey(),
		Name:             n.config.NodeName(),
		Discovery:        !n.config.NoDiscovery,
		DiscoveryV5:      n.config.DiscoveryV5,
		DiscoveryV5Addr:  n.config.DiscoveryV5Addr,
		BootstrapNodes:   n.config.BootstrapNodes,
		BootstrapNodesV5: n.config.BootstrapNodesV5,
		StaticNodes:      n.config.StaticNodes(),
		TrustedNodes:     n.config.TrusterNodes(),
		NodeDatabase:     n.config.NodeDB(),
		ListenAddr:       n.config.ListenAddr,
168
		NetRestrict:      n.config.NetRestrict,
169 170 171 172 173
		NAT:              n.config.NAT,
		Dialer:           n.config.Dialer,
		NoDial:           n.config.NoDial,
		MaxPeers:         n.config.MaxPeers,
		MaxPendingPeers:  n.config.MaxPendingPeers,
174
	}
175
	running := &p2p.Server{Config: n.serverConfig}
176 177 178
	glog.V(logger.Info).Infoln("instance:", n.serverConfig.Name)

	// Otherwise copy and specialize the P2P configuration
179 180
	services := make(map[reflect.Type]Service)
	for _, constructor := range n.serviceFuncs {
181 182
		// Create a new context for the particular service
		ctx := &ServiceContext{
183
			config:         n.config,
184 185 186
			services:       make(map[reflect.Type]Service),
			EventMux:       n.eventmux,
			AccountManager: n.accman,
187
		}
188 189
		for kind, s := range services { // copy needed for threaded access
			ctx.services[kind] = s
190 191
		}
		// Construct and save the service
192 193 194 195
		service, err := constructor(ctx)
		if err != nil {
			return err
		}
196 197 198 199 200
		kind := reflect.TypeOf(service)
		if _, exists := services[kind]; exists {
			return &DuplicateServiceError{Kind: kind}
		}
		services[kind] = service
201 202 203 204 205 206 207 208 209 210 211 212
	}
	// Gather the protocols and start the freshly assembled P2P server
	for _, service := range services {
		running.Protocols = append(running.Protocols, service.Protocols()...)
	}
	if err := running.Start(); err != nil {
		if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
			return ErrDatadirUsed
		}
		return err
	}
	// Start each of the services
213 214
	started := []reflect.Type{}
	for kind, service := range services {
215
		// Start the next service, stopping all previous upon failure
216
		if err := service.Start(running); err != nil {
217 218
			for _, kind := range started {
				services[kind].Stop()
219
			}
220 221
			running.Stop()

222 223 224
			return err
		}
		// Mark the service started for potential cleanup
225
		started = append(started, kind)
226
	}
227 228 229 230 231 232 233 234
	// Lastly start the configured RPC interfaces
	if err := n.startRPC(services); err != nil {
		for _, service := range services {
			service.Stop()
		}
		running.Stop()
		return err
	}
235 236
	// Finish initializing the startup
	n.services = services
237 238
	n.server = running
	n.stop = make(chan struct{})
239 240 241 242

	return nil
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
func (n *Node) openDataDir() error {
	if n.config.DataDir == "" {
		return nil // ephemeral
	}

	instdir := filepath.Join(n.config.DataDir, n.config.name())
	if err := os.MkdirAll(instdir, 0700); err != nil {
		return err
	}
	// Try to open the instance directory as LevelDB storage. This creates a lock file
	// which prevents concurrent use by another instance as well as accidental use of the
	// instance directory as a database.
	storage, err := storage.OpenFile(instdir, true)
	if err != nil {
		return err
	}
	n.instanceDirLock = storage
	return nil
}

263 264 265
// startRPC is a helper method to start all the various RPC endpoint during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
266
func (n *Node) startRPC(services map[reflect.Type]Service) error {
267
	// Gather all the possible APIs to surface
268 269 270 271
	apis := n.apis()
	for _, service := range services {
		apis = append(apis, service.APIs()...)
	}
272
	// Start the various API endpoints, terminating all in case of errors
273 274 275
	if err := n.startInProc(apis); err != nil {
		return err
	}
276
	if err := n.startIPC(apis); err != nil {
277
		n.stopInProc()
278 279
		return err
	}
280
	if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors); err != nil {
281
		n.stopIPC()
282
		n.stopInProc()
283 284
		return err
	}
285
	if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
286 287
		n.stopHTTP()
		n.stopIPC()
288
		n.stopInProc()
289 290
		return err
	}
291 292 293 294 295
	// All API endpoints started successfully
	n.rpcAPIs = apis
	return nil
}

296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
// startInProc initializes an in-process RPC endpoint.
func (n *Node) startInProc(apis []rpc.API) error {
	// Register all the APIs exposed by the services
	handler := rpc.NewServer()
	for _, api := range apis {
		if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
			return err
		}
		glog.V(logger.Debug).Infof("InProc registered %T under '%s'", api.Service, api.Namespace)
	}
	n.inprocHandler = handler
	return nil
}

// stopInProc terminates the in-process RPC endpoint.
func (n *Node) stopInProc() {
	if n.inprocHandler != nil {
		n.inprocHandler.Stop()
		n.inprocHandler = nil
	}
}

318 319 320 321 322 323 324 325
// startIPC initializes and starts the IPC RPC endpoint.
func (n *Node) startIPC(apis []rpc.API) error {
	// Short circuit if the IPC endpoint isn't being exposed
	if n.ipcEndpoint == "" {
		return nil
	}
	// Register all the APIs exposed by the services
	handler := rpc.NewServer()
326
	for _, api := range apis {
327
		if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
328 329
			return err
		}
330
		glog.V(logger.Debug).Infof("IPC registered %T under '%s'", api.Service, api.Namespace)
331
	}
332
	// All APIs registered, start the IPC listener
333
	var (
334 335
		listener net.Listener
		err      error
336
	)
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
	if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil {
		return err
	}
	go func() {
		glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint)

		for {
			conn, err := listener.Accept()
			if err != nil {
				// Terminate if the listener was closed
				n.lock.RLock()
				closed := n.ipcListener == nil
				n.lock.RUnlock()
				if closed {
					return
352
				}
353 354 355
				// Not closed, just some error; report and continue
				glog.V(logger.Error).Infof("IPC accept failed: %v", err)
				continue
356
			}
357
			go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
		}
	}()
	// All listeners booted successfully
	n.ipcListener = listener
	n.ipcHandler = handler

	return nil
}

// stopIPC terminates the IPC RPC endpoint.
func (n *Node) stopIPC() {
	if n.ipcListener != nil {
		n.ipcListener.Close()
		n.ipcListener = nil

		glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint)
	}
	if n.ipcHandler != nil {
		n.ipcHandler.Stop()
		n.ipcHandler = nil
378
	}
379 380 381 382
}

// startHTTP initializes and starts the HTTP RPC endpoint.
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error {
383
	// Short circuit if the HTTP endpoint isn't being exposed
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
	if endpoint == "" {
		return nil
	}
	// Generate the whitelist based on the allowed modules
	whitelist := make(map[string]bool)
	for _, module := range modules {
		whitelist[module] = true
	}
	// Register all the APIs exposed by the services
	handler := rpc.NewServer()
	for _, api := range apis {
		if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
			if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
				return err
			}
			glog.V(logger.Debug).Infof("HTTP registered %T under '%s'", api.Service, api.Namespace)
		}
	}
	// All APIs registered, start the HTTP listener
	var (
		listener net.Listener
		err      error
	)
	if listener, err = net.Listen("tcp", endpoint); err != nil {
		return err
	}
	go rpc.NewHTTPServer(cors, handler).Serve(listener)
	glog.V(logger.Info).Infof("HTTP endpoint opened: http://%s", endpoint)

413
	// All listeners booted successfully
414 415 416
	n.httpEndpoint = endpoint
	n.httpListener = listener
	n.httpHandler = handler
417 418 419 420

	return nil
}

421 422 423 424 425 426 427 428 429 430 431 432 433 434
// stopHTTP terminates the HTTP RPC endpoint.
func (n *Node) stopHTTP() {
	if n.httpListener != nil {
		n.httpListener.Close()
		n.httpListener = nil

		glog.V(logger.Info).Infof("HTTP endpoint closed: http://%s", n.httpEndpoint)
	}
	if n.httpHandler != nil {
		n.httpHandler.Stop()
		n.httpHandler = nil
	}
}

435
// startWS initializes and starts the websocket RPC endpoint.
436
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins string) error {
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
	// Short circuit if the WS endpoint isn't being exposed
	if endpoint == "" {
		return nil
	}
	// Generate the whitelist based on the allowed modules
	whitelist := make(map[string]bool)
	for _, module := range modules {
		whitelist[module] = true
	}
	// Register all the APIs exposed by the services
	handler := rpc.NewServer()
	for _, api := range apis {
		if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
			if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
				return err
			}
			glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace)
		}
	}
	// All APIs registered, start the HTTP listener
	var (
		listener net.Listener
		err      error
	)
	if listener, err = net.Listen("tcp", endpoint); err != nil {
		return err
	}
464
	go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
	glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", endpoint)

	// All listeners booted successfully
	n.wsEndpoint = endpoint
	n.wsListener = listener
	n.wsHandler = handler

	return nil
}

// stopWS terminates the websocket RPC endpoint.
func (n *Node) stopWS() {
	if n.wsListener != nil {
		n.wsListener.Close()
		n.wsListener = nil

		glog.V(logger.Info).Infof("WebSocket endpoint closed: ws://%s", n.wsEndpoint)
	}
	if n.wsHandler != nil {
		n.wsHandler.Stop()
		n.wsHandler = nil
	}
}

489 490 491 492 493 494 495
// Stop terminates a running node along with all it's services. In the node was
// not started, an error is returned.
func (n *Node) Stop() error {
	n.lock.Lock()
	defer n.lock.Unlock()

	// Short circuit if the node's not running
496
	if n.server == nil {
497 498
		return ErrNodeStopped
	}
499 500

	// Terminate the API, services and the p2p server.
501
	n.stopWS()
502
	n.stopHTTP()
503
	n.stopIPC()
504
	n.rpcAPIs = nil
505
	failure := &StopError{
506
		Services: make(map[reflect.Type]error),
507
	}
508
	for kind, service := range n.services {
509
		if err := service.Stop(); err != nil {
510
			failure.Services[kind] = err
511 512
		}
	}
513
	n.server.Stop()
514
	n.services = nil
515
	n.server = nil
516 517 518 519 520 521 522 523

	// Release instance directory lock.
	if n.instanceDirLock != nil {
		n.instanceDirLock.Close()
		n.instanceDirLock = nil
	}

	// unblock n.Wait
524
	close(n.stop)
525

526 527 528 529 530 531
	// Remove the keystore if it was created ephemerally.
	var keystoreErr error
	if n.ephemeralKeystore != "" {
		keystoreErr = os.RemoveAll(n.ephemeralKeystore)
	}

532 533 534
	if len(failure.Services) > 0 {
		return failure
	}
535 536 537
	if keystoreErr != nil {
		return keystoreErr
	}
538 539 540
	return nil
}

541 542 543 544 545 546 547 548 549 550 551 552 553
// Wait blocks the thread until the node is stopped. If the node is not running
// at the time of invocation, the method immediately returns.
func (n *Node) Wait() {
	n.lock.RLock()
	if n.server == nil {
		return
	}
	stop := n.stop
	n.lock.RUnlock()

	<-stop
}

554 555 556 557 558 559 560 561 562 563 564 565
// Restart terminates a running node and boots up a new one in its place. If the
// node isn't running, an error is returned.
func (n *Node) Restart() error {
	if err := n.Stop(); err != nil {
		return err
	}
	if err := n.Start(); err != nil {
		return err
	}
	return nil
}

566
// Attach creates an RPC client attached to an in-process API handler.
567
func (n *Node) Attach() (*rpc.Client, error) {
568 569 570 571 572 573
	n.lock.RLock()
	defer n.lock.RUnlock()

	if n.server == nil {
		return nil, ErrNodeStopped
	}
574
	return rpc.DialInProc(n.inprocHandler), nil
575 576
}

577 578 579 580 581 582 583
// Server retrieves the currently running P2P network layer. This method is meant
// only to inspect fields of the currently running server, life cycle management
// should be left to this Node entity.
func (n *Node) Server() *p2p.Server {
	n.lock.RLock()
	defer n.lock.RUnlock()

584
	return n.server
585 586
}

587 588
// Service retrieves a currently running service registered of a specific type.
func (n *Node) Service(service interface{}) error {
589 590 591 592 593
	n.lock.RLock()
	defer n.lock.RUnlock()

	// Short circuit if the node's not running
	if n.server == nil {
594
		return ErrNodeStopped
595 596
	}
	// Otherwise try to find the service to return
597 598 599 600
	element := reflect.ValueOf(service).Elem()
	if running, ok := n.services[element.Type()]; ok {
		element.Set(reflect.ValueOf(running))
		return nil
601
	}
602
	return ErrServiceUnknown
603 604
}

605
// DataDir retrieves the current datadir used by the protocol stack.
606
// Deprecated: No files should be stored in this directory, use InstanceDir instead.
607
func (n *Node) DataDir() string {
608
	return n.config.DataDir
609 610
}

611 612 613 614 615
// InstanceDir retrieves the instance directory used by the protocol stack.
func (n *Node) InstanceDir() string {
	return n.config.instanceDir()
}

616 617 618 619 620
// AccountManager retrieves the account manager used by the protocol stack.
func (n *Node) AccountManager() *accounts.Manager {
	return n.accman
}

621 622
// IPCEndpoint retrieves the current IPC endpoint used by the protocol stack.
func (n *Node) IPCEndpoint() string {
623 624 625
	return n.ipcEndpoint
}

626 627 628 629 630 631 632 633 634 635
// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack.
func (n *Node) HTTPEndpoint() string {
	return n.httpEndpoint
}

// WSEndpoint retrieves the current WS endpoint used by the protocol stack.
func (n *Node) WSEndpoint() string {
	return n.wsEndpoint
}

636 637 638
// EventMux retrieves the event multiplexer used by all the network services in
// the current protocol stack.
func (n *Node) EventMux() *event.TypeMux {
639
	return n.eventmux
640
}
641

642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
// OpenDatabase opens an existing database with the given name (or creates one if no
// previous can be found) from within the node's instance directory. If the node is
// ephemeral, a memory database is returned.
func (n *Node) OpenDatabase(name string, cache, handles int) (ethdb.Database, error) {
	if n.config.DataDir == "" {
		return ethdb.NewMemDatabase()
	}
	return ethdb.NewLDBDatabase(n.config.resolvePath(name), cache, handles)
}

// ResolvePath returns the absolute path of a resource in the instance directory.
func (n *Node) ResolvePath(x string) string {
	return n.config.resolvePath(x)
}

657 658 659
// apis returns the collection of RPC descriptors this node offers.
func (n *Node) apis() []rpc.API {
	return []rpc.API{
660 661 662 663 664 665 666 667 668 669 670 671
		{
			Namespace: "admin",
			Version:   "1.0",
			Service:   NewPrivateAdminAPI(n),
		}, {
			Namespace: "admin",
			Version:   "1.0",
			Service:   NewPublicAdminAPI(n),
			Public:    true,
		}, {
			Namespace: "debug",
			Version:   "1.0",
672
			Service:   debug.Handler,
673 674 675 676 677
		}, {
			Namespace: "debug",
			Version:   "1.0",
			Service:   NewPublicDebugAPI(n),
			Public:    true,
678 679 680 681 682
		}, {
			Namespace: "web3",
			Version:   "1.0",
			Service:   NewPublicWeb3API(n),
			Public:    true,
683 684
		},
	}
685
}