ethereum.go 4.29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
package eth

import (
	"container/list"
	"github.com/ethereum/ethchain-go"
	"github.com/ethereum/ethdb-go"
	"github.com/ethereum/ethutil-go"
	"github.com/ethereum/ethwire-go"
	"log"
	"net"
	"sync/atomic"
	"time"
)

func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
	// Loop thru the peers and close them (if we had them)
	for e := peers.Front(); e != nil; e = e.Next() {
		if peer, ok := e.Value.(*Peer); ok {
			callback(peer, e)
		}
	}
}

const (
	processReapingTimeout = 60 // TODO increase
)

type Ethereum struct {
	// Channel for shutting down the ethereum
	shutdownChan chan bool
	// DB interface
	//db *ethdb.LDBDatabase
	db *ethdb.MemDatabase
	// Block manager for processing new blocks and managing the block chain
	BlockManager *ethchain.BlockManager
	// The transaction pool. Transaction can be pushed on this pool
	// for later including in the blocks
	TxPool *ethchain.TxPool
	// Peers (NYI)
	peers *list.List
	// Nonce
	Nonce uint64
}

func New() (*Ethereum, error) {
	//db, err := ethdb.NewLDBDatabase()
	db, err := ethdb.NewMemDatabase()
	if err != nil {
		return nil, err
	}

	ethutil.Config.Db = db

	nonce, _ := ethutil.RandomUint64()
	ethereum := &Ethereum{
		shutdownChan: make(chan bool),
		db:           db,
		peers:        list.New(),
		Nonce:        nonce,
	}
	ethereum.TxPool = ethchain.NewTxPool()
	ethereum.TxPool.Speaker = ethereum
obscuren's avatar
obscuren committed
63
	ethereum.BlockManager = ethchain.NewBlockManager(ethereum)
64 65 66 67 68 69 70 71 72 73 74

	ethereum.TxPool.BlockManager = ethereum.BlockManager
	ethereum.BlockManager.TransactionPool = ethereum.TxPool

	return ethereum, nil
}

func (s *Ethereum) AddPeer(conn net.Conn) {
	peer := NewPeer(conn, s, true)

	if peer != nil {
obscuren's avatar
obscuren committed
75 76 77 78 79 80 81
		if s.peers.Len() > -1 {
			log.Println("SEED")
			peer.Start(true)
		} else {
			s.peers.PushBack(peer)
			peer.Start(false)
		}
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
	}
}

func (s *Ethereum) ProcessPeerList(addrs []string) {
	for _, addr := range addrs {
		// TODO Probably requires some sanity checks
		s.ConnectToPeer(addr)
	}
}

func (s *Ethereum) ConnectToPeer(addr string) error {
	peer := NewOutboundPeer(addr, s)

	s.peers.PushBack(peer)

	return nil
}

func (s *Ethereum) OutboundPeers() []*Peer {
	// Create a new peer slice with at least the length of the total peers
	outboundPeers := make([]*Peer, s.peers.Len())
	length := 0
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		if !p.inbound {
			outboundPeers[length] = p
			length++
		}
	})

	return outboundPeers[:length]
}

func (s *Ethereum) InboundPeers() []*Peer {
	// Create a new peer slice with at least the length of the total peers
	inboundPeers := make([]*Peer, s.peers.Len())
	length := 0
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		if p.inbound {
			inboundPeers[length] = p
			length++
		}
	})

	return inboundPeers[:length]
}

128 129
func (s *Ethereum) Broadcast(msgType ethwire.MsgType, data interface{}) {
	msg := ethwire.NewMessage(msgType, data)
130
	eachPeer(s.peers, func(p *Peer, e *list.Element) {
131
		p.QueueMessage(msg)
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
	})
}

func (s *Ethereum) ReapDeadPeers() {
	for {
		eachPeer(s.peers, func(p *Peer, e *list.Element) {
			if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
				s.peers.Remove(e)
			}
		})

		time.Sleep(processReapingTimeout * time.Second)
	}
}

// Start the ethereum
func (s *Ethereum) Start() {
149 150
	// Bind to addr and port
	ln, err := net.Listen("tcp", ":30303")
151 152 153 154 155
	if err != nil {
		// This is mainly for testing to create a "network"
		if ethutil.Config.Debug {
			log.Println("Connection listening disabled. Acting as client")

obscuren's avatar
obscuren committed
156 157 158 159
			/*
				err = s.ConnectToPeer("localhost:12345")
				if err != nil {
					log.Println("Error starting ethereum", err)
160

obscuren's avatar
obscuren committed
161 162 163
					s.Stop()
				}
			*/
164 165 166 167 168 169
		} else {
			log.Fatal(err)
		}
	} else {
		// Starting accepting connections
		go func() {
obscuren's avatar
obscuren committed
170 171
			log.Println("Ready and accepting connections")

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
			for {
				conn, err := ln.Accept()
				if err != nil {
					log.Println(err)

					continue
				}

				go s.AddPeer(conn)
			}
		}()
	}

	// Start the reaping processes
	go s.ReapDeadPeers()

	// Start the tx pool
	s.TxPool.Start()
}

func (s *Ethereum) Stop() {
	// Close the database
	defer s.db.Close()

	eachPeer(s.peers, func(p *Peer, e *list.Element) {
		p.Stop()
	})

	s.shutdownChan <- true

	s.TxPool.Stop()
}

// This function will wait for a shutdown and resumes main thread execution
func (s *Ethereum) WaitForShutdown() {
	<-s.shutdownChan
}