Skip to content

Commit

Permalink
component/otelcol: use zap adapter to accept logs from wrapped compon…
Browse files Browse the repository at this point in the history
…ents

This introduces component/otelcol/internal/zapadapter, which creates a
*zap.Logger instance from a github.com/go-kit/log.Logger instance. This
is then used when creating OpenTelemetry Collector components, allowing
us to continue to use github.com/go-kit/log consistently throughout
Flow.

Related to grafana#2213.
  • Loading branch information
rfratto committed Oct 3, 2022
1 parent 6131b7c commit 9161b3f
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 6 deletions.
5 changes: 2 additions & 3 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/component/otelcol/internal/zapadapter"
"github.com/grafana/agent/pkg/build"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

// Arguments is an extension of component.Arguments which contains necessary
Expand Down Expand Up @@ -109,8 +109,7 @@ func (e *Exporter) Update(args component.Arguments) error {

settings := otelcomponent.ExporterCreateSettings{
TelemetrySettings: otelcomponent.TelemetrySettings{
// TODO(rfratto): create an adapter from zap -> go-kit/log
Logger: zap.NewNop(),
Logger: zapadapter.New(e.opts.Logger),

// TODO(rfratto): expose tracing and logging statistics.
//
Expand Down
240 changes: 240 additions & 0 deletions component/otelcol/internal/zapadapter/zapadapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Package zapadapter allows github.com/go-kit/log to be used as a Zap core.
package zapadapter

import (
"fmt"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// New returns a new zap.Logger instance which will forward logs to the
// provided log.Logger. The github.com/go-kit/log/level package will be used
// for specifying log levels.
func New(l log.Logger) *zap.Logger {
return zap.New(&loggerCore{inner: l})
}

// loggerCore is a zap.Core implementation which forwards logs to a log.Logger
// instance.
type loggerCore struct {
inner log.Logger
}

var _ zapcore.Core = (*loggerCore)(nil)

func (lc *loggerCore) Enabled(zapcore.Level) bool {
// An instance of log.Logger has no way of knowing if logs will be filtered
// out, so we always have to return true here.
return true
}

func (lc *loggerCore) With(ff []zapcore.Field) zapcore.Core {
enc := newFieldEncoder()
defer func() { _ = enc.Close() }()

for _, f := range ff {
f.AddTo(enc)
}

return &loggerCore{
inner: log.With(lc.inner, enc.fields...),
}
}

func (lc *loggerCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
return ce.AddCore(e, lc)
}

func (lc *loggerCore) Write(e zapcore.Entry, ff []zapcore.Field) error {
enc := newFieldEncoder()
defer func() { _ = enc.Close() }()

enc.fields = append(enc.fields, "msg", e.Message)

for _, f := range ff {
f.AddTo(enc)
}

switch e.Level {
case zapcore.DebugLevel:
return level.Debug(lc.inner).Log(enc.fields...)
case zapcore.InfoLevel:
return level.Info(lc.inner).Log(enc.fields...)
case zapcore.WarnLevel:
return level.Warn(lc.inner).Log(enc.fields...)
case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel:
// We ignore panics/fatals hwere because we really don't want components to
// be able to do that.
return level.Error(lc.inner).Log(enc.fields...)
default:
return lc.inner.Log(enc.fields...)
}
}

func (lc *loggerCore) Sync() error {
return nil
}

// fieldEncoder implements zapcore.ObjectEncoder. It enables converting a
// zapcore.Field into a value which will be written as a github.com/go-kit/log
// keypair.
type fieldEncoder struct {
fields []interface{}
namespace []string
}

var _ zapcore.ObjectEncoder = (*fieldEncoder)(nil)

var encPool = sync.Pool{
New: func() any {
return &fieldEncoder{}
},
}

// newFieldEncoder creates a ready-to-use fieldEncoder. Call Close once the
// fieldEncoder is no longer needed.
func newFieldEncoder() *fieldEncoder {
fe := encPool.Get().(*fieldEncoder)
fe.fields = fe.fields[:0]
fe.namespace = fe.namespace[:0]
return fe
}

func (fe *fieldEncoder) Close() error {
encPool.Put(fe)
return nil
}

func (fe *fieldEncoder) AddArray(key string, marshaler zapcore.ArrayMarshaler) error {
// TODO(rfratto): allow this to write the value of the array instead of
// placeholder text.
fe.fields = append(fe.fields, fe.keyName(key), "<array>")
return nil
}

func (fe *fieldEncoder) AddObject(key string, marshaler zapcore.ObjectMarshaler) error {
// TODO(rfratto): allow this to write the value of the object instead of
// placeholder text.
fe.fields = append(fe.fields, fe.keyName(key), "<object>")
return nil
}

func (fe *fieldEncoder) AddBinary(key string, value []byte) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddByteString(key string, value []byte) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddBool(key string, value bool) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddComplex128(key string, value complex128) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddComplex64(key string, value complex64) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddDuration(key string, value time.Duration) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddFloat64(key string, value float64) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddFloat32(key string, value float32) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddInt(key string, value int) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddInt64(key string, value int64) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddInt32(key string, value int32) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddInt16(key string, value int16) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddInt8(key string, value int8) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddString(key, value string) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddTime(key string, value time.Time) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddUint(key string, value uint) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddUint64(key string, value uint64) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddUint32(key string, value uint32) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddUint16(key string, value uint16) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddUint8(key string, value uint8) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddUintptr(key string, value uintptr) {
fe.fields = append(fe.fields, fe.keyName(key), value)
}

func (fe *fieldEncoder) AddReflected(key string, value interface{}) error {
fe.fields = append(fe.fields, fe.keyName(key), value)
return nil
}

func (fe *fieldEncoder) OpenNamespace(key string) {
fe.namespace = append(fe.namespace, key)
}

// keyName returns the key to used for a named field. If the fieldEncoder isn't
// namespaced, then the key name is k. Otherwise, the key name the combined
// string of the namespace and key, delimiting each fragment by a period `.`.
func (fe *fieldEncoder) keyName(k string) interface{} {
if len(fe.namespace) == 0 {
return k
}
return key(append(fe.namespace, k))
}

type key []string

var _ fmt.Stringer = (key)(nil)

func (k key) String() string {
if len(k) == 1 {
return k[0]
}
return strings.Join(k, ".")
}
129 changes: 129 additions & 0 deletions component/otelcol/internal/zapadapter/zapadapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package zapadapter_test

import (
"bytes"
"fmt"
"io"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component/otelcol/internal/zapadapter"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func Test(t *testing.T) {
tt := []struct {
name string
field []zap.Field
expect string
}{
{
name: "No fields",
expect: `level=info msg="Hello, world!"`,
},
{
name: "Any",
field: []zap.Field{zap.Any("key", 12345)},
expect: `level=info msg="Hello, world!" key=12345`,
},
{
name: "Bool",
field: []zap.Field{zap.Bool("key", true)},
expect: `level=info msg="Hello, world!" key=true`,
},
{
name: "Duration",
field: []zap.Field{zap.Duration("key", time.Minute)},
expect: `level=info msg="Hello, world!" key=1m0s`,
},
{
name: "Error",
field: []zap.Field{zap.Error(fmt.Errorf("something went wrong"))},
expect: `level=info msg="Hello, world!" error="something went wrong"`,
},
{
name: "Float32",
field: []zap.Field{zap.Float32("key", 123.45)},
expect: `level=info msg="Hello, world!" key=123.45`,
},
{
name: "Float64",
field: []zap.Field{zap.Float64("key", 123.45)},
expect: `level=info msg="Hello, world!" key=123.45`,
},
{
name: "Int",
field: []zap.Field{zap.Int("key", 12345)},
expect: `level=info msg="Hello, world!" key=12345`,
},
{
name: "String",
field: []zap.Field{zap.String("key", "foobar")},
expect: `level=info msg="Hello, world!" key=foobar`,
},
{
name: "Time",
field: []zap.Field{
zap.Time("key", time.Date(2022, 12, 1, 1, 1, 1, 1, time.UTC)),
},
expect: `level=info msg="Hello, world!" key=2022-12-01T01:01:01.000000001Z`,
},
{
name: "Namespace",
field: []zap.Field{
zap.String("key", "foo"),
zap.Namespace("ns"),
zap.String("key", "bar"),
},
expect: `level=info msg="Hello, world!" key=foo ns.key=bar`,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var buf bytes.Buffer

inner := log.NewLogfmtLogger(log.NewSyncWriter(&buf))

zapLogger := zapadapter.New(inner)
zapLogger.Info("Hello, world!", tc.field...)

require.Equal(t, tc.expect, strings.TrimSpace(buf.String()))
})
}
}

func Benchmark(b *testing.B) {
// Benchmark various fields that may be commonly printed.
//
// NOTE(rfratto): Array and Object are skipped as bencmharks since they
// currently only print placeholder text in place of actual values.

runBenchmark(b, "No fields")
runBenchmark(b, "Any", zap.Any("key", 12345))
runBenchmark(b, "Bool", zap.Bool("key", true))
runBenchmark(b, "Duration", zap.Duration("key", time.Second))
runBenchmark(b, "Error", zap.Error(fmt.Errorf("hello")))
runBenchmark(b, "Float32", zap.Float32("key", 1234))
runBenchmark(b, "Float64", zap.Float64("key", 1234))
runBenchmark(b, "Int", zap.Int("key", 1234))
runBenchmark(b, "String", zap.String("key", "test"))
runBenchmark(b, "Time", zap.Time("key", time.Date(2022, 12, 1, 1, 1, 1, 1, time.UTC)))
}

func runBenchmark(b *testing.B, name string, fields ...zap.Field) {
innerLogger := log.NewLogfmtLogger(io.Discard)
innerLogger = level.NewFilter(innerLogger, level.AllowAll())

zapLogger := zapadapter.New(innerLogger)

b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
zapLogger.Info("Hello, world!", fields...)
}
})
}
Loading

0 comments on commit 9161b3f

Please sign in to comment.