dashboard.go 7.96 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package dashboard

19 20
//go:generate yarn --cwd ./assets install
//go:generate yarn --cwd ./assets build
21 22
//go:generate yarn --cwd ./assets js-beautify -f bundle.js.map -r -w 1
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js assets/bundle.js.map
23
//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
24
//go:generate sh -c "sed 's#var _bundleJsMap#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
25
//go:generate sh -c "sed 's#var _indexHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
26
//go:generate gofmt -w -s assets.go
27 28 29 30 31 32 33 34 35

import (
	"fmt"
	"net"
	"net/http"
	"sync"
	"sync/atomic"
	"time"

36 37
	"io"

38 39
	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/p2p"
40
	"github.com/ethereum/go-ethereum/params"
41
	"github.com/ethereum/go-ethereum/rpc"
42
	"github.com/mohae/deepcopy"
43 44 45 46
	"golang.org/x/net/websocket"
)

const (
47
	sampleLimit = 200 // Maximum number of data samples
48 49 50 51
)

// Dashboard contains the dashboard internals.
type Dashboard struct {
52 53 54 55 56 57 58
	config *Config // Configuration values for the dashboard

	listener   net.Listener       // Network listener listening for dashboard clients
	conns      map[uint32]*client // Currently live websocket connections
	nextConnID uint32             // Next connection id

	history *Message // Stored historical data
59

60 61 62 63
	lock     sync.Mutex   // Lock protecting the dashboard's internals
	sysLock  sync.RWMutex // Lock protecting the stored system data
	peerLock sync.RWMutex // Lock protecting the stored peer data
	logLock  sync.RWMutex // Lock protecting the stored log data
64

65 66
	geodb  *geoDB // geoip database instance for IP to geographical information conversions
	logdir string // Directory containing the log files
67

68
	quit chan chan error // Channel used for graceful exit
69
	wg   sync.WaitGroup  // Wait group used to close the data collector threads
70 71 72 73 74
}

// client represents active websocket connection with a remote browser.
type client struct {
	conn   *websocket.Conn // Particular live websocket connection
75
	msg    chan *Message   // Message queue for the update messages
76 77 78 79
	logger log.Logger      // Logger for the particular live websocket connection
}

// New creates a new dashboard instance with the given configuration.
80
func New(config *Config, commit string, logdir string) *Dashboard {
81
	now := time.Now()
82 83 84 85 86
	versionMeta := ""
	if len(params.VersionMeta) > 0 {
		versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
	}
	return &Dashboard{
87 88 89
		conns:  make(map[uint32]*client),
		config: config,
		quit:   make(chan chan error),
90 91 92 93 94 95
		history: &Message{
			General: &GeneralMessage{
				Commit:  commit,
				Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
			},
			System: &SystemMessage{
96 97 98 99 100 101 102 103
				ActiveMemory:   emptyChartEntries(now, sampleLimit),
				VirtualMemory:  emptyChartEntries(now, sampleLimit),
				NetworkIngress: emptyChartEntries(now, sampleLimit),
				NetworkEgress:  emptyChartEntries(now, sampleLimit),
				ProcessCPU:     emptyChartEntries(now, sampleLimit),
				SystemCPU:      emptyChartEntries(now, sampleLimit),
				DiskRead:       emptyChartEntries(now, sampleLimit),
				DiskWrite:      emptyChartEntries(now, sampleLimit),
104
			},
105
		},
106
		logdir: logdir,
107 108 109 110
	}
}

// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
111
func emptyChartEntries(t time.Time, limit int) ChartEntries {
112 113
	ce := make(ChartEntries, limit)
	for i := 0; i < limit; i++ {
114
		ce[i] = new(ChartEntry)
115 116
	}
	return ce
117 118
}

119
// Protocols implements the node.Service interface.
120 121
func (db *Dashboard) Protocols() []p2p.Protocol { return nil }

122
// APIs implements the node.Service interface.
123 124
func (db *Dashboard) APIs() []rpc.API { return nil }

125 126
// Start starts the data collection thread and the listening server of the dashboard.
// Implements the node.Service interface.
127
func (db *Dashboard) Start(server *p2p.Server) error {
128 129
	log.Info("Starting dashboard")

130 131
	db.wg.Add(3)
	go db.collectSystemData()
132
	go db.streamLogs()
133
	go db.collectPeerData()
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148

	http.HandleFunc("/", db.webHandler)
	http.Handle("/api", websocket.Handler(db.apiHandler))

	listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", db.config.Host, db.config.Port))
	if err != nil {
		return err
	}
	db.listener = listener

	go http.Serve(listener, nil)

	return nil
}

149 150
// Stop stops the data collection thread and the connection listener of the dashboard.
// Implements the node.Service interface.
151 152 153 154 155 156 157 158
func (db *Dashboard) Stop() error {
	// Close the connection listener.
	var errs []error
	if err := db.listener.Close(); err != nil {
		errs = append(errs, err)
	}
	// Close the collectors.
	errc := make(chan error, 1)
159
	for i := 0; i < 3; i++ {
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
		db.quit <- errc
		if err := <-errc; err != nil {
			errs = append(errs, err)
		}
	}
	// Close the connections.
	db.lock.Lock()
	for _, c := range db.conns {
		if err := c.conn.Close(); err != nil {
			c.logger.Warn("Failed to close connection", "err", err)
		}
	}
	db.lock.Unlock()

	// Wait until every goroutine terminates.
	db.wg.Wait()
	log.Info("Dashboard stopped")

	var err error
	if len(errs) > 0 {
		err = fmt.Errorf("%v", errs)
	}

	return err
}

// webHandler handles all non-api requests, simply flattening and returning the dashboard website.
func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
	log.Debug("Request", "URL", r.URL)

	path := r.URL.String()
	if path == "/" {
192
		path = "/index.html"
193
	}
194
	blob, err := Asset(path[1:])
195 196 197 198 199 200 201 202 203 204
	if err != nil {
		log.Warn("Failed to load the asset", "path", path, "err", err)
		http.Error(w, "not found", http.StatusNotFound)
		return
	}
	w.Write(blob)
}

// apiHandler handles requests for the dashboard.
func (db *Dashboard) apiHandler(conn *websocket.Conn) {
205
	id := atomic.AddUint32(&db.nextConnID, 1)
206 207
	client := &client{
		conn:   conn,
208
		msg:    make(chan *Message, 128),
209 210
		logger: log.New("id", id),
	}
211
	done := make(chan struct{})
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230

	// Start listening for messages to send.
	db.wg.Add(1)
	go func() {
		defer db.wg.Done()

		for {
			select {
			case <-done:
				return
			case msg := <-client.msg:
				if err := websocket.JSON.Send(client.conn, msg); err != nil {
					client.logger.Warn("Failed to send the message", "msg", msg, "err", err)
					client.conn.Close()
					return
				}
			}
		}
	}()
231

232
	// Send the past data.
233 234 235 236 237 238 239 240 241 242 243 244
	db.sysLock.RLock()
	db.peerLock.RLock()
	db.logLock.RLock()

	h := deepcopy.Copy(db.history).(*Message)

	db.sysLock.RUnlock()
	db.peerLock.RUnlock()
	db.logLock.RUnlock()

	client.msg <- h

245
	// Start tracking the connection and drop at connection loss.
246
	db.lock.Lock()
247 248 249 250 251 252 253 254
	db.conns[id] = client
	db.lock.Unlock()
	defer func() {
		db.lock.Lock()
		delete(db.conns, id)
		db.lock.Unlock()
	}()
	for {
255 256 257 258 259
		r := new(Request)
		if err := websocket.JSON.Receive(conn, r); err != nil {
			if err != io.EOF {
				client.logger.Warn("Failed to receive request", "err", err)
			}
260 261 262
			close(done)
			return
		}
263 264 265 266 267 268
		if r.Logs != nil {
			db.handleLogRequest(r.Logs, client)
		}
	}
}

269
// sendToAll sends the given message to the active dashboards.
270
func (db *Dashboard) sendToAll(msg *Message) {
271 272 273
	db.lock.Lock()
	for _, c := range db.conns {
		select {
274
		case c.msg <- msg:
275 276 277 278 279 280
		default:
			c.conn.Close()
		}
	}
	db.lock.Unlock()
}