dashboard.go 12 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package dashboard

19 20 21
//go:generate yarn --cwd ./assets install
//go:generate yarn --cwd ./assets build
//go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js
22
//go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
23
//go:generate sh -c "sed 's#var _indexHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
24
//go:generate gofmt -w -s assets.go
25 26 27 28 29

import (
	"fmt"
	"net"
	"net/http"
30
	"runtime"
31 32 33 34
	"sync"
	"sync/atomic"
	"time"

35 36
	"io"

37
	"github.com/elastic/gosigar"
38
	"github.com/ethereum/go-ethereum/log"
39
	"github.com/ethereum/go-ethereum/metrics"
40
	"github.com/ethereum/go-ethereum/p2p"
41
	"github.com/ethereum/go-ethereum/params"
42
	"github.com/ethereum/go-ethereum/rpc"
43
	"github.com/mohae/deepcopy"
44 45 46 47
	"golang.org/x/net/websocket"
)

const (
48 49 50 51 52 53 54 55
	activeMemorySampleLimit   = 200 // Maximum number of active memory data samples
	virtualMemorySampleLimit  = 200 // Maximum number of virtual memory data samples
	networkIngressSampleLimit = 200 // Maximum number of network ingress data samples
	networkEgressSampleLimit  = 200 // Maximum number of network egress data samples
	processCPUSampleLimit     = 200 // Maximum number of process cpu data samples
	systemCPUSampleLimit      = 200 // Maximum number of system cpu data samples
	diskReadSampleLimit       = 200 // Maximum number of disk read data samples
	diskWriteSampleLimit      = 200 // Maximum number of disk write data samples
56 57
)

58
var nextID uint32 // Next connection id
59 60 61 62 63 64 65

// Dashboard contains the dashboard internals.
type Dashboard struct {
	config *Config

	listener net.Listener
	conns    map[uint32]*client // Currently live websocket connections
66
	history  *Message
67
	lock     sync.RWMutex // Lock protecting the dashboard's internals
68

69 70
	logdir string

71 72 73 74 75 76 77
	quit chan chan error // Channel used for graceful exit
	wg   sync.WaitGroup
}

// client represents active websocket connection with a remote browser.
type client struct {
	conn   *websocket.Conn // Particular live websocket connection
78
	msg    chan *Message   // Message queue for the update messages
79 80 81 82
	logger log.Logger      // Logger for the particular live websocket connection
}

// New creates a new dashboard instance with the given configuration.
83
func New(config *Config, commit string, logdir string) *Dashboard {
84
	now := time.Now()
85 86 87 88 89
	versionMeta := ""
	if len(params.VersionMeta) > 0 {
		versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
	}
	return &Dashboard{
90 91 92
		conns:  make(map[uint32]*client),
		config: config,
		quit:   make(chan chan error),
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
		history: &Message{
			General: &GeneralMessage{
				Commit:  commit,
				Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
			},
			System: &SystemMessage{
				ActiveMemory:   emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
				VirtualMemory:  emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
				NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
				NetworkEgress:  emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
				ProcessCPU:     emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
				SystemCPU:      emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
				DiskRead:       emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
				DiskWrite:      emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
			},
108
		},
109
		logdir: logdir,
110 111 112 113 114 115 116 117 118 119 120 121
	}
}

// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries {
	ce := make(ChartEntries, limit)
	for i := 0; i < limit; i++ {
		ce[i] = &ChartEntry{
			Time: t.Add(-time.Duration(i) * refresh),
		}
	}
	return ce
122 123
}

124
// Protocols implements the node.Service interface.
125 126
func (db *Dashboard) Protocols() []p2p.Protocol { return nil }

127
// APIs implements the node.Service interface.
128 129
func (db *Dashboard) APIs() []rpc.API { return nil }

130 131
// Start starts the data collection thread and the listening server of the dashboard.
// Implements the node.Service interface.
132
func (db *Dashboard) Start(server *p2p.Server) error {
133 134
	log.Info("Starting dashboard")

135 136
	db.wg.Add(2)
	go db.collectData()
137
	go db.streamLogs()
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152

	http.HandleFunc("/", db.webHandler)
	http.Handle("/api", websocket.Handler(db.apiHandler))

	listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", db.config.Host, db.config.Port))
	if err != nil {
		return err
	}
	db.listener = listener

	go http.Serve(listener, nil)

	return nil
}

153 154
// Stop stops the data collection thread and the connection listener of the dashboard.
// Implements the node.Service interface.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
func (db *Dashboard) Stop() error {
	// Close the connection listener.
	var errs []error
	if err := db.listener.Close(); err != nil {
		errs = append(errs, err)
	}
	// Close the collectors.
	errc := make(chan error, 1)
	for i := 0; i < 2; i++ {
		db.quit <- errc
		if err := <-errc; err != nil {
			errs = append(errs, err)
		}
	}
	// Close the connections.
	db.lock.Lock()
	for _, c := range db.conns {
		if err := c.conn.Close(); err != nil {
			c.logger.Warn("Failed to close connection", "err", err)
		}
	}
	db.lock.Unlock()

	// Wait until every goroutine terminates.
	db.wg.Wait()
	log.Info("Dashboard stopped")

	var err error
	if len(errs) > 0 {
		err = fmt.Errorf("%v", errs)
	}

	return err
}

// webHandler handles all non-api requests, simply flattening and returning the dashboard website.
func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
	log.Debug("Request", "URL", r.URL)

	path := r.URL.String()
	if path == "/" {
196
		path = "/index.html"
197
	}
198
	blob, err := Asset(path[1:])
199 200 201 202 203 204 205 206 207 208
	if err != nil {
		log.Warn("Failed to load the asset", "path", path, "err", err)
		http.Error(w, "not found", http.StatusNotFound)
		return
	}
	w.Write(blob)
}

// apiHandler handles requests for the dashboard.
func (db *Dashboard) apiHandler(conn *websocket.Conn) {
209
	id := atomic.AddUint32(&nextID, 1)
210 211
	client := &client{
		conn:   conn,
212
		msg:    make(chan *Message, 128),
213 214
		logger: log.New("id", id),
	}
215
	done := make(chan struct{})
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234

	// Start listening for messages to send.
	db.wg.Add(1)
	go func() {
		defer db.wg.Done()

		for {
			select {
			case <-done:
				return
			case msg := <-client.msg:
				if err := websocket.JSON.Send(client.conn, msg); err != nil {
					client.logger.Warn("Failed to send the message", "msg", msg, "err", err)
					client.conn.Close()
					return
				}
			}
		}
	}()
235

236
	db.lock.Lock()
237
	// Send the past data.
238
	client.msg <- deepcopy.Copy(db.history).(*Message)
239 240 241 242 243 244 245 246 247
	// Start tracking the connection and drop at connection loss.
	db.conns[id] = client
	db.lock.Unlock()
	defer func() {
		db.lock.Lock()
		delete(db.conns, id)
		db.lock.Unlock()
	}()
	for {
248 249 250 251 252
		r := new(Request)
		if err := websocket.JSON.Receive(conn, r); err != nil {
			if err != io.EOF {
				client.logger.Warn("Failed to receive request", "err", err)
			}
253 254 255
			close(done)
			return
		}
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
		if r.Logs != nil {
			db.handleLogRequest(r.Logs, client)
		}
	}
}

// meterCollector returns a function, which retrieves a specific meter.
func meterCollector(name string) func() int64 {
	if metric := metrics.DefaultRegistry.Get(name); metric != nil {
		m := metric.(metrics.Meter)
		return func() int64 {
			return m.Count()
		}
	}
	return func() int64 {
		return 0
272 273 274 275 276 277
	}
}

// collectData collects the required data to plot on the dashboard.
func (db *Dashboard) collectData() {
	defer db.wg.Done()
278

279 280 281
	systemCPUUsage := gosigar.Cpu{}
	systemCPUUsage.Get()
	var (
282 283
		mem runtime.MemStats

284 285 286 287 288 289 290
		collectNetworkIngress = meterCollector("p2p/InboundTraffic")
		collectNetworkEgress  = meterCollector("p2p/OutboundTraffic")
		collectDiskRead       = meterCollector("eth/db/chaindata/disk/read")
		collectDiskWrite      = meterCollector("eth/db/chaindata/disk/write")

		prevNetworkIngress = collectNetworkIngress()
		prevNetworkEgress  = collectNetworkEgress()
291 292
		prevProcessCPUTime = getProcessCPUTime()
		prevSystemCPUUsage = systemCPUUsage
293 294
		prevDiskRead       = collectDiskRead()
		prevDiskWrite      = collectDiskWrite()
295 296 297 298

		frequency = float64(db.config.Refresh / time.Second)
		numCPU    = float64(runtime.NumCPU())
	)
299 300 301 302 303 304 305

	for {
		select {
		case errc := <-db.quit:
			errc <- nil
			return
		case <-time.After(db.config.Refresh):
306 307
			systemCPUUsage.Get()
			var (
308 309
				curNetworkIngress = collectNetworkIngress()
				curNetworkEgress  = collectNetworkEgress()
310 311
				curProcessCPUTime = getProcessCPUTime()
				curSystemCPUUsage = systemCPUUsage
312 313
				curDiskRead       = collectDiskRead()
				curDiskWrite      = collectDiskWrite()
314 315 316 317

				deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
				deltaNetworkEgress  = float64(curNetworkEgress - prevNetworkEgress)
				deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime
318
				deltaSystemCPUUsage = curSystemCPUUsage.Delta(prevSystemCPUUsage)
319 320 321 322 323 324 325 326 327 328
				deltaDiskRead       = curDiskRead - prevDiskRead
				deltaDiskWrite      = curDiskWrite - prevDiskWrite
			)
			prevNetworkIngress = curNetworkIngress
			prevNetworkEgress = curNetworkEgress
			prevProcessCPUTime = curProcessCPUTime
			prevSystemCPUUsage = curSystemCPUUsage
			prevDiskRead = curDiskRead
			prevDiskWrite = curDiskWrite

329
			now := time.Now()
330 331 332

			runtime.ReadMemStats(&mem)
			activeMemory := &ChartEntry{
333
				Time:  now,
334
				Value: float64(mem.Alloc) / frequency,
335
			}
336
			virtualMemory := &ChartEntry{
337
				Time:  now,
338
				Value: float64(mem.Sys) / frequency,
339
			}
340 341 342
			networkIngress := &ChartEntry{
				Time:  now,
				Value: deltaNetworkIngress / frequency,
343
			}
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
			networkEgress := &ChartEntry{
				Time:  now,
				Value: deltaNetworkEgress / frequency,
			}
			processCPU := &ChartEntry{
				Time:  now,
				Value: deltaProcessCPUTime / frequency / numCPU * 100,
			}
			systemCPU := &ChartEntry{
				Time:  now,
				Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU,
			}
			diskRead := &ChartEntry{
				Time:  now,
				Value: float64(deltaDiskRead) / frequency,
			}
			diskWrite := &ChartEntry{
				Time:  now,
				Value: float64(deltaDiskWrite) / frequency,
363
			}
364 365 366 367 368 369 370 371 372
			sys := db.history.System
			db.lock.Lock()
			sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory)
			sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory)
			sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress)
			sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress)
			sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU)
			sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU)
			sys.DiskRead = append(sys.DiskRead[1:], diskRead)
373
			sys.DiskWrite = append(sys.DiskWrite[1:], diskWrite)
374
			db.lock.Unlock()
375 376

			db.sendToAll(&Message{
377
				System: &SystemMessage{
378 379 380 381 382 383 384 385
					ActiveMemory:   ChartEntries{activeMemory},
					VirtualMemory:  ChartEntries{virtualMemory},
					NetworkIngress: ChartEntries{networkIngress},
					NetworkEgress:  ChartEntries{networkEgress},
					ProcessCPU:     ChartEntries{processCPU},
					SystemCPU:      ChartEntries{systemCPU},
					DiskRead:       ChartEntries{diskRead},
					DiskWrite:      ChartEntries{diskWrite},
386
				},
387 388 389 390 391 392
			})
		}
	}
}

// sendToAll sends the given message to the active dashboards.
393
func (db *Dashboard) sendToAll(msg *Message) {
394 395 396
	db.lock.Lock()
	for _, c := range db.conns {
		select {
397
		case c.msg <- msg:
398 399 400 401 402 403
		default:
			c.conn.Close()
		}
	}
	db.lock.Unlock()
}