ethereum.go 14.6 KB
Newer Older
1 2 3 4
package eth

import (
	"container/list"
obscuren's avatar
obscuren committed
5
	"encoding/json"
6
	"fmt"
obscuren's avatar
obscuren committed
7
	"math/big"
8
	"math/rand"
9
	"net"
obscuren's avatar
obscuren committed
10
	"path"
obscuren's avatar
obscuren committed
11
	"strconv"
12
	"strings"
obscuren's avatar
obscuren committed
13
	"sync"
14 15
	"sync/atomic"
	"time"
obscuren's avatar
obscuren committed
16

obscuren's avatar
obscuren committed
17
	"github.com/ethereum/go-ethereum/chain"
obscuren's avatar
obscuren committed
18
	"github.com/ethereum/go-ethereum/crypto"
19 20
	"github.com/ethereum/go-ethereum/ethutil"
	"github.com/ethereum/go-ethereum/event"
obscuren's avatar
obscuren committed
21
	"github.com/ethereum/go-ethereum/logger"
22
	"github.com/ethereum/go-ethereum/rpc"
obscuren's avatar
obscuren committed
23
	"github.com/ethereum/go-ethereum/state"
obscuren's avatar
obscuren committed
24
	"github.com/ethereum/go-ethereum/wire"
25 26
)

obscuren's avatar
obscuren committed
27 28
const (
	seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt"
obscuren's avatar
obscuren committed
29
	seedNodeAddress        = "poc-7.ethdev.com:30303"
obscuren's avatar
obscuren committed
30
)
31

obscuren's avatar
obscuren committed
32
var loggerger = logger.NewLogger("SERV")
zelig's avatar
zelig committed
33

34 35 36
func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
	// Loop thru the peers and close them (if we had them)
	for e := peers.Front(); e != nil; e = e.Next() {
obscuren's avatar
obscuren committed
37
		callback(e.Value.(*Peer), e)
38 39 40 41 42 43 44 45 46 47
	}
}

const (
	processReapingTimeout = 60 // TODO increase
)

type Ethereum struct {
	// Channel for shutting down the ethereum
	shutdownChan chan bool
obscuren's avatar
obscuren committed
48
	quit         chan bool
obscuren's avatar
obscuren committed
49

50
	// DB interface
51
	db ethutil.Database
52
	// State manager for processing new blocks and managing the over all states
obscuren's avatar
obscuren committed
53
	blockManager *chain.BlockManager
54 55
	// The transaction pool. Transaction can be pushed on this pool
	// for later including in the blocks
obscuren's avatar
obscuren committed
56
	txPool *chain.TxPool
57
	// The canonical chain
obscuren's avatar
obscuren committed
58
	blockChain *chain.ChainManager
obscuren's avatar
obscuren committed
59 60
	// The block pool
	blockPool *BlockPool
61
	// Eventer
Felix Lange's avatar
Felix Lange committed
62
	eventMux event.TypeMux
63
	// Peers
64 65 66
	peers *list.List
	// Nonce
	Nonce uint64
obscuren's avatar
obscuren committed
67 68

	Addr net.Addr
69
	Port string
obscuren's avatar
obscuren committed
70

obscuren's avatar
obscuren committed
71 72
	blacklist [][]byte

obscuren's avatar
obscuren committed
73 74 75 76
	peerMut sync.Mutex

	// Capabilities for outgoing peers
	serverCaps Caps
obscuren's avatar
obscuren committed
77 78

	nat NAT
79 80 81

	// Specifies the desired amount of maximum peers
	MaxPeers int
Maran's avatar
Maran committed
82

83 84 85 86
	Mining bool

	listening bool

obscuren's avatar
obscuren committed
87
	RpcServer *rpc.JsonRpcServer
88

obscuren's avatar
obscuren committed
89
	keyManager *crypto.KeyManager
90

obscuren's avatar
obscuren committed
91
	clientIdentity wire.ClientIdentity
obscuren's avatar
obscuren committed
92 93

	isUpToDate bool
obscuren's avatar
obscuren committed
94

95 96
	filterMu sync.RWMutex
	filterId int
obscuren's avatar
obscuren committed
97
	filters  map[int]*chain.Filter
98 99
}

obscuren's avatar
obscuren committed
100
func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
101
	var err error
obscuren's avatar
obscuren committed
102
	var nat NAT
103

obscuren's avatar
obscuren committed
104 105 106
	if usePnp {
		nat, err = Discover()
		if err != nil {
obscuren's avatar
obscuren committed
107
			loggerger.Debugln("UPnP failed", err)
obscuren's avatar
obscuren committed
108
		}
obscuren's avatar
obscuren committed
109 110
	}

111 112
	bootstrapDb(db)

113 114 115 116
	ethutil.Config.Db = db

	nonce, _ := ethutil.RandomUint64()
	ethereum := &Ethereum{
117 118 119 120 121 122 123 124 125
		shutdownChan:   make(chan bool),
		quit:           make(chan bool),
		db:             db,
		peers:          list.New(),
		Nonce:          nonce,
		serverCaps:     caps,
		nat:            nat,
		keyManager:     keyManager,
		clientIdentity: clientIdentity,
obscuren's avatar
obscuren committed
126
		isUpToDate:     true,
obscuren's avatar
obscuren committed
127
		filters:        make(map[int]*chain.Filter),
128
	}
Maran's avatar
Maran committed
129

obscuren's avatar
obscuren committed
130
	ethereum.blockPool = NewBlockPool(ethereum)
obscuren's avatar
obscuren committed
131 132
	ethereum.txPool = chain.NewTxPool(ethereum)
	ethereum.blockChain = chain.NewChainManager(ethereum)
obscuren's avatar
obscuren committed
133
	ethereum.blockManager = chain.NewBlockManager(ethereum)
134

135
	// Start the tx pool
136
	ethereum.txPool.Start()
137

138 139 140
	return ethereum, nil
}

obscuren's avatar
obscuren committed
141
func (s *Ethereum) KeyManager() *crypto.KeyManager {
142 143 144
	return s.keyManager
}

obscuren's avatar
obscuren committed
145
func (s *Ethereum) ClientIdentity() wire.ClientIdentity {
146 147 148
	return s.clientIdentity
}

obscuren's avatar
obscuren committed
149
func (s *Ethereum) ChainManager() *chain.ChainManager {
150 151 152
	return s.blockChain
}

obscuren's avatar
obscuren committed
153 154
func (s *Ethereum) BlockManager() *chain.BlockManager {
	return s.blockManager
155 156
}

obscuren's avatar
obscuren committed
157
func (s *Ethereum) TxPool() *chain.TxPool {
158 159
	return s.txPool
}
obscuren's avatar
obscuren committed
160 161 162
func (s *Ethereum) BlockPool() *BlockPool {
	return s.blockPool
}
163
func (s *Ethereum) EventMux() *event.TypeMux {
Felix Lange's avatar
Felix Lange committed
164
	return &s.eventMux
165
}
obscuren's avatar
obscuren committed
166 167 168
func (self *Ethereum) Db() ethutil.Database {
	return self.db
}
169

170 171 172
func (s *Ethereum) ServerCaps() Caps {
	return s.serverCaps
}
173 174 175 176 177 178
func (s *Ethereum) IsMining() bool {
	return s.Mining
}
func (s *Ethereum) PeerCount() int {
	return s.peers.Len()
}
179 180 181 182
func (s *Ethereum) IsUpToDate() bool {
	upToDate := true
	eachPeer(s.peers, func(peer *Peer, e *list.Element) {
		if atomic.LoadInt32(&peer.connected) == 1 {
183
			if peer.catchingUp == true && peer.versionKnown {
184 185 186 187 188 189
				upToDate = false
			}
		}
	})
	return upToDate
}
190 191 192
func (s *Ethereum) PushPeer(peer *Peer) {
	s.peers.PushBack(peer)
}
193 194 195
func (s *Ethereum) IsListening() bool {
	return s.listening
}
196

obscuren's avatar
obscuren committed
197 198 199 200 201 202 203 204 205 206 207 208
func (s *Ethereum) HighestTDPeer() (td *big.Int) {
	td = big.NewInt(0)

	eachPeer(s.peers, func(p *Peer, v *list.Element) {
		if p.td.Cmp(td) > 0 {
			td = p.td
		}
	})

	return
}

obscuren's avatar
obscuren committed
209 210 211 212
func (self *Ethereum) BlacklistPeer(peer *Peer) {
	self.blacklist = append(self.blacklist, peer.pubkey)
}

213 214 215
func (s *Ethereum) AddPeer(conn net.Conn) {
	peer := NewPeer(conn, s, true)

216 217 218 219
	if peer != nil {
		if s.peers.Len() < s.MaxPeers {
			peer.Start()
		} else {
obscuren's avatar
obscuren committed
220
			loggerger.Debugf("Max connected peers reached. Not adding incoming peer.")
221
		}
222 223 224 225 226 227 228 229 230 231 232
	}
}

func (s *Ethereum) ProcessPeerList(addrs []string) {
	for _, addr := range addrs {
		// TODO Probably requires some sanity checks
		s.ConnectToPeer(addr)
	}
}

func (s *Ethereum) ConnectToPeer(addr string) error {
obscuren's avatar
obscuren committed
233 234 235
	if s.peers.Len() < s.MaxPeers {
		var alreadyConnected bool

obscuren's avatar
obscuren committed
236
		ahost, aport, _ := net.SplitHostPort(addr)
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
		var chost string

		ips, err := net.LookupIP(ahost)

		if err != nil {
			return err
		} else {
			// If more then one ip is available try stripping away the ipv6 ones
			if len(ips) > 1 {
				var ipsv4 []net.IP
				// For now remove the ipv6 addresses
				for _, ip := range ips {
					if strings.Contains(ip.String(), "::") {
						continue
					} else {
						ipsv4 = append(ipsv4, ip)
					}
				}
				if len(ipsv4) == 0 {
					return fmt.Errorf("[SERV] No IPV4 addresses available for hostname")
				}

				// Pick a random ipv4 address, simulating round-robin DNS.
				rand.Seed(time.Now().UTC().UnixNano())
				i := rand.Intn(len(ipsv4))
				chost = ipsv4[i].String()
			} else {
				if len(ips) == 0 {
					return fmt.Errorf("[SERV] No IPs resolved for the given hostname")
					return nil
				}
				chost = ips[0].String()
			}
		}

obscuren's avatar
obscuren committed
272 273 274 275
		eachPeer(s.peers, func(p *Peer, v *list.Element) {
			if p.conn == nil {
				return
			}
obscuren's avatar
obscuren committed
276
			phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
obscuren's avatar
obscuren committed
277

obscuren's avatar
obscuren committed
278
			if phost == chost && pport == aport {
obscuren's avatar
obscuren committed
279
				alreadyConnected = true
obscuren's avatar
obscuren committed
280
				//loggerger.Debugf("Peer %s already added.\n", chost)
obscuren's avatar
obscuren committed
281 282 283 284 285 286
				return
			}
		})

		if alreadyConnected {
			return nil
287 288
		}

289
		NewOutboundPeer(addr, s, s.serverCaps)
obscuren's avatar
obscuren committed
290
	}
291 292 293 294 295 296 297 298 299

	return nil
}

func (s *Ethereum) OutboundPeers() []*Peer {
	// Create a new peer slice with at least the length of the total peers
	outboundPeers := make([]*Peer, s.peers.Len())
	length := 0
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
obscuren's avatar
obscuren committed
300
		if !p.inbound && p.conn != nil {
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
			outboundPeers[length] = p
			length++
		}
	})

	return outboundPeers[:length]
}

func (s *Ethereum) InboundPeers() []*Peer {
	// Create a new peer slice with at least the length of the total peers
	inboundPeers := make([]*Peer, s.peers.Len())
	length := 0
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		if p.inbound {
			inboundPeers[length] = p
			length++
		}
	})

	return inboundPeers[:length]
}

323
func (s *Ethereum) InOutPeers() []*Peer {
obscuren's avatar
obscuren committed
324 325 326
	// Reap the dead peers first
	s.reapPeers()

327 328 329 330
	// Create a new peer slice with at least the length of the total peers
	inboundPeers := make([]*Peer, s.peers.Len())
	length := 0
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
obscuren's avatar
obscuren committed
331 332 333 334 335
		// Only return peers with an actual ip
		if len(p.host) > 0 {
			inboundPeers[length] = p
			length++
		}
336 337 338 339 340
	})

	return inboundPeers[:length]
}

obscuren's avatar
obscuren committed
341 342
func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) {
	msg := wire.NewMessage(msgType, data)
obscuren's avatar
obscuren committed
343 344 345
	s.BroadcastMsg(msg)
}

obscuren's avatar
obscuren committed
346
func (s *Ethereum) BroadcastMsg(msg *wire.Msg) {
347
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
348
		p.QueueMessage(msg)
349 350 351
	})
}

352 353 354 355
func (s *Ethereum) Peers() *list.List {
	return s.peers
}

obscuren's avatar
obscuren committed
356
func (s *Ethereum) reapPeers() {
357 358 359 360 361 362 363 364
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
			s.removePeerElement(e)
		}
	})
}

func (s *Ethereum) removePeerElement(e *list.Element) {
obscuren's avatar
obscuren committed
365 366 367
	s.peerMut.Lock()
	defer s.peerMut.Unlock()

368 369
	s.peers.Remove(e)

370
	s.eventMux.Post(PeerListEvent{s.peers})
371 372 373 374 375 376
}

func (s *Ethereum) RemovePeer(p *Peer) {
	eachPeer(s.peers, func(peer *Peer, e *list.Element) {
		if peer == p {
			s.removePeerElement(e)
obscuren's avatar
obscuren committed
377 378 379
		}
	})
}
380

obscuren's avatar
obscuren committed
381
func (s *Ethereum) reapDeadPeerHandler() {
obscuren's avatar
obscuren committed
382 383 384 385 386 387 388
	reapTimer := time.NewTicker(processReapingTimeout * time.Second)

	for {
		select {
		case <-reapTimer.C:
			s.reapPeers()
		}
389 390 391 392
	}
}

// Start the ethereum
393
func (s *Ethereum) Start(seed bool) {
obscuren's avatar
obscuren committed
394
	s.blockPool.Start()
obscuren's avatar
obscuren committed
395
	s.blockManager.Start()
396

397
	// Bind to addr and port
398
	ln, err := net.Listen("tcp", ":"+s.Port)
399
	if err != nil {
obscuren's avatar
obscuren committed
400
		loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port)
401
		s.listening = false
402
	} else {
403
		s.listening = true
404
		// Starting accepting connections
obscuren's avatar
obscuren committed
405
		loggerger.Infoln("Ready and accepting connections")
obscuren's avatar
obscuren committed
406 407
		// Start the peer handler
		go s.peerHandler(ln)
408 409
	}

obscuren's avatar
obscuren committed
410 411 412
	if s.nat != nil {
		go s.upnpUpdateThread()
	}
obscuren's avatar
obscuren committed
413

414
	// Start the reaping processes
obscuren's avatar
obscuren committed
415
	go s.reapDeadPeerHandler()
obscuren's avatar
obscuren committed
416
	go s.update()
obscuren's avatar
obscuren committed
417
	go s.filterLoop()
418

419 420 421
	if seed {
		s.Seed()
	}
422
	s.ConnectToPeer("localhost:40404")
obscuren's avatar
obscuren committed
423
	loggerger.Infoln("Server started")
424 425 426
}

func (s *Ethereum) Seed() {
obscuren's avatar
obscuren committed
427 428
	// Sorry Py person. I must blacklist. you perform badly
	s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031"))
obscuren's avatar
obscuren committed
429
	ips := PastPeers()
obscuren's avatar
obscuren committed
430
	if len(ips) > 0 {
431
		for _, ip := range ips {
obscuren's avatar
obscuren committed
432
			loggerger.Infoln("Connecting to previous peer ", ip)
obscuren's avatar
obscuren committed
433 434 435
			s.ConnectToPeer(ip)
		}
	} else {
obscuren's avatar
obscuren committed
436
		loggerger.Debugln("Retrieving seed nodes")
obscuren's avatar
obscuren committed
437 438 439 440 441 442 443

		// Eth-Go Bootstrapping
		ips, er := net.LookupIP("seed.bysh.me")
		if er == nil {
			peers := []string{}
			for _, ip := range ips {
				node := fmt.Sprintf("%s:%d", ip.String(), 30303)
obscuren's avatar
obscuren committed
444
				loggerger.Debugln("Found DNS Go Peer:", node)
obscuren's avatar
obscuren committed
445 446 447
				peers = append(peers, node)
			}
			s.ProcessPeerList(peers)
448 449
		}

obscuren's avatar
obscuren committed
450 451 452 453 454 455 456 457 458 459 460 461 462 463
		// Official DNS Bootstrapping
		_, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org")
		if err == nil {
			peers := []string{}
			// Iterate SRV nodes
			for _, n := range nodes {
				target := n.Target
				port := strconv.Itoa(int(n.Port))
				// Resolve target to ip (Go returns list, so may resolve to multiple ips?)
				addr, err := net.LookupHost(target)
				if err == nil {
					for _, a := range addr {
						// Build string out of SRV port and Resolved IP
						peer := net.JoinHostPort(a, port)
obscuren's avatar
obscuren committed
464
						loggerger.Debugln("Found DNS Bootstrap Peer:", peer)
obscuren's avatar
obscuren committed
465 466 467
						peers = append(peers, peer)
					}
				} else {
obscuren's avatar
obscuren committed
468
					loggerger.Debugln("Couldn't resolve :", target)
Maran's avatar
Maran committed
469 470
				}
			}
obscuren's avatar
obscuren committed
471 472
			// Connect to Peer list
			s.ProcessPeerList(peers)
Maran's avatar
Maran committed
473
		}
obscuren's avatar
obscuren committed
474

obscuren's avatar
obscuren committed
475 476
		s.ConnectToPeer(seedNodeAddress)
	}
477 478
}

obscuren's avatar
obscuren committed
479 480 481 482
func (s *Ethereum) peerHandler(listener net.Listener) {
	for {
		conn, err := listener.Accept()
		if err != nil {
obscuren's avatar
obscuren committed
483
			loggerger.Debugln(err)
obscuren's avatar
obscuren committed
484 485 486 487 488 489 490 491

			continue
		}

		go s.AddPeer(conn)
	}
}

492
func (s *Ethereum) Stop() {
Felix Lange's avatar
Felix Lange committed
493 494 495
	// Stop eventMux first, it will close all subscriptions.
	s.eventMux.Stop()

496 497 498
	// Close the database
	defer s.db.Close()

obscuren's avatar
obscuren committed
499 500 501 502
	var ips []string
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		ips = append(ips, p.conn.RemoteAddr().String())
	})
obscuren's avatar
obscuren committed
503 504 505 506 507

	if len(ips) > 0 {
		d, _ := json.MarshalIndent(ips, "", "    ")
		ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d)
	}
obscuren's avatar
obscuren committed
508

509 510 511 512
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		p.Stop()
	})

obscuren's avatar
obscuren committed
513 514
	close(s.quit)

515 516 517
	if s.RpcServer != nil {
		s.RpcServer.Stop()
	}
518
	s.txPool.Stop()
obscuren's avatar
obscuren committed
519
	s.blockManager.Stop()
obscuren's avatar
obscuren committed
520
	s.blockPool.Stop()
521

obscuren's avatar
obscuren committed
522
	loggerger.Infoln("Server stopped")
obscuren's avatar
obscuren committed
523
	close(s.shutdownChan)
524 525 526 527 528 529
}

// This function will wait for a shutdown and resumes main thread execution
func (s *Ethereum) WaitForShutdown() {
	<-s.shutdownChan
}
obscuren's avatar
obscuren committed
530 531 532 533

func (s *Ethereum) upnpUpdateThread() {
	// Go off immediately to prevent code duplication, thereafter we renew
	// lease every 15 minutes.
obscuren's avatar
obscuren committed
534
	timer := time.NewTimer(5 * time.Minute)
535
	lport, _ := strconv.ParseInt(s.Port, 10, 16)
obscuren's avatar
obscuren committed
536 537 538 539 540
	first := true
out:
	for {
		select {
		case <-timer.C:
obscuren's avatar
obscuren committed
541 542
			var err error
			_, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
obscuren's avatar
obscuren committed
543
			if err != nil {
obscuren's avatar
obscuren committed
544
				loggerger.Debugln("can't add UPnP port mapping:", err)
obscuren's avatar
obscuren committed
545 546 547
				break out
			}
			if first && err == nil {
obscuren's avatar
obscuren committed
548
				_, err = s.nat.GetExternalAddress()
obscuren's avatar
obscuren committed
549
				if err != nil {
obscuren's avatar
obscuren committed
550
					loggerger.Debugln("UPnP can't get external address:", err)
obscuren's avatar
obscuren committed
551 552 553 554 555 556 557 558 559 560 561 562 563
					continue out
				}
				first = false
			}
			timer.Reset(time.Minute * 15)
		case <-s.quit:
			break out
		}
	}

	timer.Stop()

	if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
obscuren's avatar
obscuren committed
564
		loggerger.Debugln("unable to remove UPnP port mapping:", err)
obscuren's avatar
obscuren committed
565
	} else {
obscuren's avatar
obscuren committed
566
		loggerger.Debugln("succesfully disestablished UPnP port mapping")
obscuren's avatar
obscuren committed
567 568
	}
}
obscuren's avatar
obscuren committed
569 570 571 572 573 574 575 576 577

func (self *Ethereum) update() {
	upToDateTimer := time.NewTicker(1 * time.Second)

out:
	for {
		select {
		case <-upToDateTimer.C:
			if self.IsUpToDate() && !self.isUpToDate {
578
				self.eventMux.Post(ChainSyncEvent{false})
obscuren's avatar
obscuren committed
579 580
				self.isUpToDate = true
			} else if !self.IsUpToDate() && self.isUpToDate {
581
				self.eventMux.Post(ChainSyncEvent{true})
obscuren's avatar
obscuren committed
582 583 584 585 586 587 588
				self.isUpToDate = false
			}
		case <-self.quit:
			break out
		}
	}
}
589

590 591 592
// InstallFilter adds filter for blockchain events.
// The filter's callbacks will run for matching blocks and messages.
// The filter should not be modified after it has been installed.
obscuren's avatar
obscuren committed
593
func (self *Ethereum) InstallFilter(filter *chain.Filter) (id int) {
594 595 596 597 598 599
	self.filterMu.Lock()
	id = self.filterId
	self.filters[id] = filter
	self.filterId++
	self.filterMu.Unlock()
	return id
obscuren's avatar
obscuren committed
600 601 602
}

func (self *Ethereum) UninstallFilter(id int) {
603
	self.filterMu.Lock()
obscuren's avatar
obscuren committed
604
	delete(self.filters, id)
605
	self.filterMu.Unlock()
obscuren's avatar
obscuren committed
606 607
}

608 609
// GetFilter retrieves a filter installed using InstallFilter.
// The filter may not be modified.
obscuren's avatar
obscuren committed
610
func (self *Ethereum) GetFilter(id int) *chain.Filter {
611 612
	self.filterMu.RLock()
	defer self.filterMu.RUnlock()
obscuren's avatar
obscuren committed
613 614 615 616 617
	return self.filters[id]
}

func (self *Ethereum) filterLoop() {
	// Subscribe to events
obscuren's avatar
obscuren committed
618
	events := self.eventMux.Subscribe(chain.NewBlockEvent{}, state.Messages(nil))
619 620
	for event := range events.Chan() {
		switch event := event.(type) {
obscuren's avatar
obscuren committed
621
		case chain.NewBlockEvent:
622 623 624 625
			self.filterMu.RLock()
			for _, filter := range self.filters {
				if filter.BlockCallback != nil {
					filter.BlockCallback(event.Block)
obscuren's avatar
obscuren committed
626 627
				}
			}
628 629
			self.filterMu.RUnlock()

obscuren's avatar
obscuren committed
630
		case state.Messages:
631 632 633 634 635 636
			self.filterMu.RLock()
			for _, filter := range self.filters {
				if filter.MessageCallback != nil {
					msgs := filter.FilterMessages(event)
					if len(msgs) > 0 {
						filter.MessageCallback(msgs)
obscuren's avatar
obscuren committed
637 638 639
					}
				}
			}
640
			self.filterMu.RUnlock()
obscuren's avatar
obscuren committed
641 642 643 644
		}
	}
}

645 646 647 648 649 650 651 652
func bootstrapDb(db ethutil.Database) {
	d, _ := db.Get([]byte("ProtocolVersion"))
	protov := ethutil.NewValue(d).Uint()

	if protov == 0 {
		db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
	}
}
obscuren's avatar
obscuren committed
653 654 655 656 657 658 659 660

func PastPeers() []string {
	var ips []string
	data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"))
	json.Unmarshal([]byte(data), &ips)

	return ips
}