Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TriggerLevelWriter #602

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ var (
FatalLevel: "FTL",
PanicLevel: "PNC",
}

// TriggerLevelWriterBufferReuseLimit is a limit in bytes that a buffer is dropped
// from the TriggerLevelWriter buffer pool if the buffer grows above the limit.
TriggerLevelWriterBufferReuseLimit = 64 * 1024
)

var (
Expand Down
132 changes: 132 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,135 @@ func (w *FilteredLevelWriter) WriteLevel(level Level, p []byte) (int, error) {
}
return len(p), nil
}

var triggerWriterPool = &sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}

// TriggerLevelWriter buffers log lines at the ConditionalLevel or below
// until a trigger level (or higher) line is emitted. Log lines with level
// higher than ConditionalLevel are always written out to the destination
// writer. If trigger never happens, buffered log lines are never written out.
//
// It can be used to configure "log level per request".
type TriggerLevelWriter struct {
// Destination writer. If LevelWriter is provided (usually), its WriteLevel is used
// instead of Write.
io.Writer

// ConditionalLevel is the level (and below) at which lines are buffered until
// a trigger level (or higher) line is emitted. Usually this is set to DebugLevel.
ConditionalLevel Level

// TriggerLevel is the lowest level that triggers the sending of the conditional
// level lines. Usually this is set to ErrorLevel.
TriggerLevel Level

buf *bytes.Buffer
triggered bool
mu sync.Mutex
}

func (w *TriggerLevelWriter) WriteLevel(l Level, p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()

// At first trigger level or above log line, we flush the buffer and change the
// trigger state to triggered.
if !w.triggered && l >= w.TriggerLevel {
err := w.trigger()
if err != nil {
return 0, err
}
}

// Unless triggered, we buffer everything at and below ConditionalLevel.
if !w.triggered && l <= w.ConditionalLevel {
if w.buf == nil {
w.buf = triggerWriterPool.Get().(*bytes.Buffer)
}

// We prefix each log line with a byte with the level.
// Hopefully we will never have a level value which equals a newline
// (which could interfere with reconstruction of log lines in the trigger method).
w.buf.WriteByte(byte(l))
w.buf.Write(p)
return len(p), nil
}

// Anything above ConditionalLevel is always passed through.
// Once triggered, everything is passed through.
if lw, ok := w.Writer.(LevelWriter); ok {
return lw.WriteLevel(l, p)
}
return w.Write(p)
}

// trigger expects lock to be held.
func (w *TriggerLevelWriter) trigger() error {
if w.triggered {
return nil
}
w.triggered = true

if w.buf == nil {
return nil
}

p := w.buf.Bytes()
for len(p) > 0 {
// We do not use bufio.Scanner here because we already have full buffer
// in the memory and we do not want extra copying from the buffer to
// scanner's token slice, nor we want to hit scanner's token size limit,
// and we also want to preserve newlines.
i := bytes.IndexByte(p, '\n')
line := p[0 : i+1]
p = p[i+1:]
// We prefixed each log line with a byte with the level.
level := Level(line[0])
line = line[1:]
var err error
if lw, ok := w.Writer.(LevelWriter); ok {
_, err = lw.WriteLevel(level, line)
} else {
_, err = w.Write(line)
}
if err != nil {
return err
}
}

return nil
}

// Trigger forces flushing the buffer and change the trigger state to
// triggered, if the writer has not already been triggered before.
func (w *TriggerLevelWriter) Trigger() error {
w.mu.Lock()
defer w.mu.Unlock()

return w.trigger()
}

// Close closes the writer and returns the buffer to the pool.
func (w *TriggerLevelWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()

if w.buf == nil {
return nil
}

// We return the buffer only if it has not grown above the limit.
// This prevents accumulation of large buffers in the pool just
// because occasionally a large buffer might be needed.
if w.buf.Cap() <= TriggerLevelWriterBufferReuseLimit {
w.buf.Reset()
triggerWriterPool.Put(w.buf)
}
w.buf = nil

return nil
}
55 changes: 55 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,58 @@ func TestFilteredLevelWriter(t *testing.T) {
t.Errorf("Expected %q, got %q.", want, p)
}
}

type testWrite struct {
Level
Line []byte
}

func TestTriggerLevelWriter(t *testing.T) {
tests := []struct {
write []testWrite
want []byte
all []byte
}{{
[]testWrite{
{DebugLevel, []byte("no\n")},
{InfoLevel, []byte("yes\n")},
},
[]byte("yes\n"),
[]byte("yes\nno\n"),
}, {
[]testWrite{
{DebugLevel, []byte("yes1\n")},
{InfoLevel, []byte("yes2\n")},
{ErrorLevel, []byte("yes3\n")},
{DebugLevel, []byte("yes4\n")},
},
[]byte("yes2\nyes1\nyes3\nyes4\n"),
[]byte("yes2\nyes1\nyes3\nyes4\n"),
}}

for k, tt := range tests {
t.Run(fmt.Sprintf("case=%d", k), func(t *testing.T) {
buf := bytes.Buffer{}
writer := TriggerLevelWriter{Writer: LevelWriterAdapter{&buf}, ConditionalLevel: DebugLevel, TriggerLevel: ErrorLevel}
t.Cleanup(func() { writer.Close() })
for _, w := range tt.write {
_, err := writer.WriteLevel(w.Level, w.Line)
if err != nil {
t.Error(err)
}
}
p := buf.Bytes()
if want := tt.want; !bytes.Equal([]byte(want), p) {
t.Errorf("Expected %q, got %q.", want, p)
}
err := writer.Trigger()
if err != nil {
t.Error(err)
}
p = buf.Bytes()
if want := tt.all; !bytes.Equal([]byte(want), p) {
t.Errorf("Expected %q, got %q.", want, p)
}
})
}
}
Loading