log.go 7.79 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package dashboard

import (
	"bytes"
	"encoding/json"
	"io/ioutil"
	"os"
	"path/filepath"
	"regexp"
	"sort"
	"time"

	"github.com/ethereum/go-ethereum/log"
	"github.com/mohae/deepcopy"
	"github.com/rjeczalik/notify"
)

var emptyChunk = json.RawMessage("[]")

// prepLogs creates a JSON array from the given log record buffer.
// Returns the prepared array and the position of the last '\n'
// character in the original buffer, or -1 if it doesn't contain any.
func prepLogs(buf []byte) (json.RawMessage, int) {
	b := make(json.RawMessage, 1, len(buf)+1)
	b[0] = '['
	b = append(b, buf...)
	last := -1
	for i := 1; i < len(b); i++ {
		if b[i] == '\n' {
			b[i] = ','
			last = i
		}
	}
	if last < 0 {
		return emptyChunk, -1
	}
	b[last] = ']'
	return b[:last+1], last - 1
}

// handleLogRequest searches for the log file specified by the timestamp of the
// request, creates a JSON array out of it and sends it to the requesting client.
func (db *Dashboard) handleLogRequest(r *LogsRequest, c *client) {
	files, err := ioutil.ReadDir(db.logdir)
	if err != nil {
		log.Warn("Failed to open logdir", "path", db.logdir, "err", err)
		return
	}
	re := regexp.MustCompile(`\.log$`)
	fileNames := make([]string, 0, len(files))
	for _, f := range files {
		if f.Mode().IsRegular() && re.MatchString(f.Name()) {
			fileNames = append(fileNames, f.Name())
		}
	}
	if len(fileNames) < 1 {
		log.Warn("No log files in logdir", "path", db.logdir)
		return
	}
	idx := sort.Search(len(fileNames), func(idx int) bool {
		// Returns the smallest index such as fileNames[idx] >= r.Name,
		// if there is no such index, returns n.
		return fileNames[idx] >= r.Name
	})

	switch {
	case idx < 0:
		return
	case idx == 0 && r.Past:
		return
	case idx >= len(fileNames):
		return
	case r.Past:
		idx--
	case idx == len(fileNames)-1 && fileNames[idx] == r.Name:
		return
	case idx == len(fileNames)-1 || (idx == len(fileNames)-2 && fileNames[idx] == r.Name):
		// The last file is continuously updated, and its chunks are streamed,
		// so in order to avoid log record duplication on the client side, it is
		// handled differently. Its actual content is always saved in the history.
97
		db.logLock.RLock()
98 99
		if db.history.Logs != nil {
			c.msg <- &Message{
100
				Logs: deepcopy.Copy(db.history.Logs).(*LogsMessage),
101 102
			}
		}
103
		db.logLock.RUnlock()
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
		return
	case fileNames[idx] == r.Name:
		idx++
	}

	path := filepath.Join(db.logdir, fileNames[idx])
	var buf []byte
	if buf, err = ioutil.ReadFile(path); err != nil {
		log.Warn("Failed to read file", "path", path, "err", err)
		return
	}
	chunk, end := prepLogs(buf)
	if end < 0 {
		log.Warn("The file doesn't contain valid logs", "path", path)
		return
	}
	c.msg <- &Message{
		Logs: &LogsMessage{
			Source: &LogFile{
				Name: fileNames[idx],
				Last: r.Past && idx == 0,
			},
			Chunk: chunk,
		},
	}
}

// streamLogs watches the file system, and when the logger writes
// the new log records into the files, picks them up, then makes
// JSON array out of them and sends them to the clients.
func (db *Dashboard) streamLogs() {
	defer db.wg.Done()
	var (
		err  error
		errc chan error
	)
	defer func() {
		if errc == nil {
			errc = <-db.quit
		}
		errc <- err
	}()

	files, err := ioutil.ReadDir(db.logdir)
	if err != nil {
		log.Warn("Failed to open logdir", "path", db.logdir, "err", err)
		return
	}
	var (
		opened *os.File // File descriptor for the opened active log file.
		buf    []byte   // Contains the recently written log chunks, which are not sent to the clients yet.
	)

	// The log records are always written into the last file in alphabetical order, because of the timestamp.
	re := regexp.MustCompile(`\.log$`)
	i := len(files) - 1
	for i >= 0 && (!files[i].Mode().IsRegular() || !re.MatchString(files[i].Name())) {
		i--
	}
	if i < 0 {
		log.Warn("No log files in logdir", "path", db.logdir)
		return
	}
	if opened, err = os.OpenFile(filepath.Join(db.logdir, files[i].Name()), os.O_RDONLY, 0600); err != nil {
		log.Warn("Failed to open file", "name", files[i].Name(), "err", err)
		return
	}
	defer opened.Close() // Close the lastly opened file.
	fi, err := opened.Stat()
	if err != nil {
		log.Warn("Problem with file", "name", opened.Name(), "err", err)
		return
	}
177
	db.logLock.Lock()
178 179 180 181 182 183 184
	db.history.Logs = &LogsMessage{
		Source: &LogFile{
			Name: fi.Name(),
			Last: true,
		},
		Chunk: emptyChunk,
	}
185
	db.logLock.Unlock()
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242

	watcher := make(chan notify.EventInfo, 10)
	if err := notify.Watch(db.logdir, watcher, notify.Create); err != nil {
		log.Warn("Failed to create file system watcher", "err", err)
		return
	}
	defer notify.Stop(watcher)

	ticker := time.NewTicker(db.config.Refresh)
	defer ticker.Stop()

loop:
	for err == nil || errc == nil {
		select {
		case event := <-watcher:
			// Make sure that new log file was created.
			if !re.Match([]byte(event.Path())) {
				break
			}
			if opened == nil {
				log.Warn("The last log file is not opened")
				break loop
			}
			// The new log file's name is always greater,
			// because it is created using the actual log record's time.
			if opened.Name() >= event.Path() {
				break
			}
			// Read the rest of the previously opened file.
			chunk, err := ioutil.ReadAll(opened)
			if err != nil {
				log.Warn("Failed to read file", "name", opened.Name(), "err", err)
				break loop
			}
			buf = append(buf, chunk...)
			opened.Close()

			if chunk, last := prepLogs(buf); last >= 0 {
				// Send the rest of the previously opened file.
				db.sendToAll(&Message{
					Logs: &LogsMessage{
						Chunk: chunk,
					},
				})
			}
			if opened, err = os.OpenFile(event.Path(), os.O_RDONLY, 0644); err != nil {
				log.Warn("Failed to open file", "name", event.Path(), "err", err)
				break loop
			}
			buf = buf[:0]

			// Change the last file in the history.
			fi, err := opened.Stat()
			if err != nil {
				log.Warn("Problem with file", "name", opened.Name(), "err", err)
				break loop
			}
243
			db.logLock.Lock()
244 245
			db.history.Logs.Source.Name = fi.Name()
			db.history.Logs.Chunk = emptyChunk
246
			db.logLock.Unlock()
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
		case <-ticker.C: // Send log updates to the client.
			if opened == nil {
				log.Warn("The last log file is not opened")
				break loop
			}
			// Read the new logs created since the last read.
			chunk, err := ioutil.ReadAll(opened)
			if err != nil {
				log.Warn("Failed to read file", "name", opened.Name(), "err", err)
				break loop
			}
			b := append(buf, chunk...)

			chunk, last := prepLogs(b)
			if last < 0 {
				break
			}
			// Only keep the invalid part of the buffer, which can be valid after the next read.
			buf = b[last+1:]

			var l *LogsMessage
			// Update the history.
269
			db.logLock.Lock()
270 271 272 273 274 275 276 277 278 279 280
			if bytes.Equal(db.history.Logs.Chunk, emptyChunk) {
				db.history.Logs.Chunk = chunk
				l = deepcopy.Copy(db.history.Logs).(*LogsMessage)
			} else {
				b = make([]byte, len(db.history.Logs.Chunk)+len(chunk)-1)
				copy(b, db.history.Logs.Chunk)
				b[len(db.history.Logs.Chunk)-1] = ','
				copy(b[len(db.history.Logs.Chunk):], chunk[1:])
				db.history.Logs.Chunk = b
				l = &LogsMessage{Chunk: chunk}
			}
281
			db.logLock.Unlock()
282 283 284 285 286 287 288

			db.sendToAll(&Message{Logs: l})
		case errc = <-db.quit:
			break loop
		}
	}
}