sys.go 2.12 KB
Newer Older
1 2 3
package logger

import (
4
	"fmt"
5 6 7
	"sync"
)

8
type stdMsg struct {
Taylor Gerring's avatar
Taylor Gerring committed
9 10 11 12
	level LogLevel
	msg   string
}

13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
type jsonMsg []byte

func (m jsonMsg) Level() LogLevel {
	return 0
}

func (m jsonMsg) String() string {
	return string(m)
}

type LogMsg interface {
	Level() LogLevel
	fmt.Stringer
}

func (m stdMsg) Level() LogLevel {
	return m.level
}

func (m stdMsg) String() string {
	return m.msg
}

36
var (
37
	logMessageC = make(chan LogMsg)
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
	addSystemC  = make(chan LogSystem)
	flushC      = make(chan chan struct{})
	resetC      = make(chan chan struct{})
)

func init() {
	go dispatchLoop()
}

// each system can buffer this many messages before
// blocking incoming log messages.
const sysBufferSize = 500

func dispatchLoop() {
	var (
		systems  []LogSystem
54
		systemIn []chan LogMsg
55 56 57
		systemWG sync.WaitGroup
	)
	bootSystem := func(sys LogSystem) {
58
		in := make(chan LogMsg, sysBufferSize)
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
		systemIn = append(systemIn, in)
		systemWG.Add(1)
		go sysLoop(sys, in, &systemWG)
	}

	for {
		select {
		case msg := <-logMessageC:
			for _, c := range systemIn {
				c <- msg
			}

		case sys := <-addSystemC:
			systems = append(systems, sys)
			bootSystem(sys)

		case waiter := <-resetC:
			// reset means terminate all systems
			for _, c := range systemIn {
				close(c)
			}
			systems = nil
			systemIn = nil
			systemWG.Wait()
			close(waiter)

		case waiter := <-flushC:
			// flush means reboot all systems
			for _, c := range systemIn {
				close(c)
			}
			systemIn = nil
			systemWG.Wait()
			for _, sys := range systems {
				bootSystem(sys)
			}
			close(waiter)
		}
	}
}

100
func sysLoop(sys LogSystem, in <-chan LogMsg, wg *sync.WaitGroup) {
101
	for msg := range in {
102
		sys.LogPrint(msg)
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
	}
	wg.Done()
}

// Reset removes all active log systems.
// It blocks until all current messages have been delivered.
func Reset() {
	waiter := make(chan struct{})
	resetC <- waiter
	<-waiter
}

// Flush waits until all current log messages have been dispatched to
// the active log systems.
func Flush() {
	waiter := make(chan struct{})
	flushC <- waiter
	<-waiter
}

// AddLogSystem starts printing messages to the given LogSystem.
func AddLogSystem(sys LogSystem) {
	addSystemC <- sys
}