diff --git a/globals.go b/globals.go index bf9e3da9..b38a7fce 100644 --- a/globals.go +++ b/globals.go @@ -132,6 +132,10 @@ var ( FatalLevel: "FTL", PanicLevel: "PNC", } + + // TriggerLevelWriterBufferReuseLimit is a limit in bytes that a buffer is dropped + // from the TriggerLevelWriter buffer pool if the buffer grows above the limit. + TriggerLevelWriterBufferReuseLimit = 64 * 1024 ) var ( diff --git a/writer.go b/writer.go index 9b9ef88e..50d7653d 100644 --- a/writer.go +++ b/writer.go @@ -180,3 +180,135 @@ func (w *FilteredLevelWriter) WriteLevel(level Level, p []byte) (int, error) { } return len(p), nil } + +var triggerWriterPool = &sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +// TriggerLevelWriter buffers log lines at the ConditionalLevel or below +// until a trigger level (or higher) line is emitted. Log lines with level +// higher than ConditionalLevel are always written out to the destination +// writer. If trigger never happens, buffered log lines are never written out. +// +// It can be used to configure "log level per request". +type TriggerLevelWriter struct { + // Destination writer. If LevelWriter is provided (usually), its WriteLevel is used + // instead of Write. + io.Writer + + // ConditionalLevel is the level (and below) at which lines are buffered until + // a trigger level (or higher) line is emitted. Usually this is set to DebugLevel. + ConditionalLevel Level + + // TriggerLevel is the lowest level that triggers the sending of the conditional + // level lines. Usually this is set to ErrorLevel. + TriggerLevel Level + + buf *bytes.Buffer + triggered bool + mu sync.Mutex +} + +func (w *TriggerLevelWriter) WriteLevel(l Level, p []byte) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + // At first trigger level or above log line, we flush the buffer and change the + // trigger state to triggered. + if !w.triggered && l >= w.TriggerLevel { + err := w.trigger() + if err != nil { + return 0, err + } + } + + // Unless triggered, we buffer everything at and below ConditionalLevel. + if !w.triggered && l <= w.ConditionalLevel { + if w.buf == nil { + w.buf = triggerWriterPool.Get().(*bytes.Buffer) + } + + // We prefix each log line with a byte with the level. + // Hopefully we will never have a level value which equals a newline + // (which could interfere with reconstruction of log lines in the trigger method). + w.buf.WriteByte(byte(l)) + w.buf.Write(p) + return len(p), nil + } + + // Anything above ConditionalLevel is always passed through. + // Once triggered, everything is passed through. + if lw, ok := w.Writer.(LevelWriter); ok { + return lw.WriteLevel(l, p) + } + return w.Write(p) +} + +// trigger expects lock to be held. +func (w *TriggerLevelWriter) trigger() error { + if w.triggered { + return nil + } + w.triggered = true + + if w.buf == nil { + return nil + } + + p := w.buf.Bytes() + for len(p) > 0 { + // We do not use bufio.Scanner here because we already have full buffer + // in the memory and we do not want extra copying from the buffer to + // scanner's token slice, nor we want to hit scanner's token size limit, + // and we also want to preserve newlines. + i := bytes.IndexByte(p, '\n') + line := p[0 : i+1] + p = p[i+1:] + // We prefixed each log line with a byte with the level. + level := Level(line[0]) + line = line[1:] + var err error + if lw, ok := w.Writer.(LevelWriter); ok { + _, err = lw.WriteLevel(level, line) + } else { + _, err = w.Write(line) + } + if err != nil { + return err + } + } + + return nil +} + +// Trigger forces flushing the buffer and change the trigger state to +// triggered, if the writer has not already been triggered before. +func (w *TriggerLevelWriter) Trigger() error { + w.mu.Lock() + defer w.mu.Unlock() + + return w.trigger() +} + +// Close closes the writer and returns the buffer to the pool. +func (w *TriggerLevelWriter) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.buf == nil { + return nil + } + + // We return the buffer only if it has not grown above the limit. + // This prevents accumulation of large buffers in the pool just + // because occasionally a large buffer might be needed. + if w.buf.Cap() <= TriggerLevelWriterBufferReuseLimit { + w.buf.Reset() + triggerWriterPool.Put(w.buf) + } + w.buf = nil + + return nil +} diff --git a/writer_test.go b/writer_test.go index 60595ba6..f2a61df9 100644 --- a/writer_test.go +++ b/writer_test.go @@ -195,3 +195,58 @@ func TestFilteredLevelWriter(t *testing.T) { t.Errorf("Expected %q, got %q.", want, p) } } + +type testWrite struct { + Level + Line []byte +} + +func TestTriggerLevelWriter(t *testing.T) { + tests := []struct { + write []testWrite + want []byte + all []byte + }{{ + []testWrite{ + {DebugLevel, []byte("no\n")}, + {InfoLevel, []byte("yes\n")}, + }, + []byte("yes\n"), + []byte("yes\nno\n"), + }, { + []testWrite{ + {DebugLevel, []byte("yes1\n")}, + {InfoLevel, []byte("yes2\n")}, + {ErrorLevel, []byte("yes3\n")}, + {DebugLevel, []byte("yes4\n")}, + }, + []byte("yes2\nyes1\nyes3\nyes4\n"), + []byte("yes2\nyes1\nyes3\nyes4\n"), + }} + + for k, tt := range tests { + t.Run(fmt.Sprintf("case=%d", k), func(t *testing.T) { + buf := bytes.Buffer{} + writer := TriggerLevelWriter{Writer: LevelWriterAdapter{&buf}, ConditionalLevel: DebugLevel, TriggerLevel: ErrorLevel} + t.Cleanup(func() { writer.Close() }) + for _, w := range tt.write { + _, err := writer.WriteLevel(w.Level, w.Line) + if err != nil { + t.Error(err) + } + } + p := buf.Bytes() + if want := tt.want; !bytes.Equal([]byte(want), p) { + t.Errorf("Expected %q, got %q.", want, p) + } + err := writer.Trigger() + if err != nil { + t.Error(err) + } + p = buf.Bytes() + if want := tt.all; !bytes.Equal([]byte(want), p) { + t.Errorf("Expected %q, got %q.", want, p) + } + }) + } +}