From 9161b3f6392648d787704fd3406f44794ee77009 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 3 Oct 2022 16:14:19 -0400 Subject: [PATCH 1/3] component/otelcol: use zap adapter to accept logs from wrapped components This introduces component/otelcol/internal/zapadapter, which creates a *zap.Logger instance from a github.com/go-kit/log.Logger instance. This is then used when creating OpenTelemetry Collector components, allowing us to continue to use github.com/go-kit/log consistently throughout Flow. Related to #2213. --- component/otelcol/exporter/exporter.go | 5 +- .../otelcol/internal/zapadapter/zapadapter.go | 240 ++++++++++++++++++ .../internal/zapadapter/zapadapter_test.go | 129 ++++++++++ component/otelcol/receiver/receiver.go | 5 +- 4 files changed, 373 insertions(+), 6 deletions(-) create mode 100644 component/otelcol/internal/zapadapter/zapadapter.go create mode 100644 component/otelcol/internal/zapadapter/zapadapter_test.go diff --git a/component/otelcol/exporter/exporter.go b/component/otelcol/exporter/exporter.go index 17deb3695770..27d03cb00c64 100644 --- a/component/otelcol/exporter/exporter.go +++ b/component/otelcol/exporter/exporter.go @@ -11,12 +11,12 @@ import ( "github.com/grafana/agent/component/otelcol" "github.com/grafana/agent/component/otelcol/internal/lazyconsumer" "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/component/otelcol/internal/zapadapter" "github.com/grafana/agent/pkg/build" otelcomponent "go.opentelemetry.io/collector/component" otelconfig "go.opentelemetry.io/collector/config" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) // Arguments is an extension of component.Arguments which contains necessary @@ -109,8 +109,7 @@ func (e *Exporter) Update(args component.Arguments) error { settings := otelcomponent.ExporterCreateSettings{ TelemetrySettings: otelcomponent.TelemetrySettings{ - // TODO(rfratto): create an adapter from zap -> go-kit/log - Logger: zap.NewNop(), + Logger: zapadapter.New(e.opts.Logger), // TODO(rfratto): expose tracing and logging statistics. // diff --git a/component/otelcol/internal/zapadapter/zapadapter.go b/component/otelcol/internal/zapadapter/zapadapter.go new file mode 100644 index 000000000000..e6179afad0cc --- /dev/null +++ b/component/otelcol/internal/zapadapter/zapadapter.go @@ -0,0 +1,240 @@ +// Package zapadapter allows github.com/go-kit/log to be used as a Zap core. +package zapadapter + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// New returns a new zap.Logger instance which will forward logs to the +// provided log.Logger. The github.com/go-kit/log/level package will be used +// for specifying log levels. +func New(l log.Logger) *zap.Logger { + return zap.New(&loggerCore{inner: l}) +} + +// loggerCore is a zap.Core implementation which forwards logs to a log.Logger +// instance. +type loggerCore struct { + inner log.Logger +} + +var _ zapcore.Core = (*loggerCore)(nil) + +func (lc *loggerCore) Enabled(zapcore.Level) bool { + // An instance of log.Logger has no way of knowing if logs will be filtered + // out, so we always have to return true here. + return true +} + +func (lc *loggerCore) With(ff []zapcore.Field) zapcore.Core { + enc := newFieldEncoder() + defer func() { _ = enc.Close() }() + + for _, f := range ff { + f.AddTo(enc) + } + + return &loggerCore{ + inner: log.With(lc.inner, enc.fields...), + } +} + +func (lc *loggerCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + return ce.AddCore(e, lc) +} + +func (lc *loggerCore) Write(e zapcore.Entry, ff []zapcore.Field) error { + enc := newFieldEncoder() + defer func() { _ = enc.Close() }() + + enc.fields = append(enc.fields, "msg", e.Message) + + for _, f := range ff { + f.AddTo(enc) + } + + switch e.Level { + case zapcore.DebugLevel: + return level.Debug(lc.inner).Log(enc.fields...) + case zapcore.InfoLevel: + return level.Info(lc.inner).Log(enc.fields...) + case zapcore.WarnLevel: + return level.Warn(lc.inner).Log(enc.fields...) + case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel: + // We ignore panics/fatals hwere because we really don't want components to + // be able to do that. + return level.Error(lc.inner).Log(enc.fields...) + default: + return lc.inner.Log(enc.fields...) + } +} + +func (lc *loggerCore) Sync() error { + return nil +} + +// fieldEncoder implements zapcore.ObjectEncoder. It enables converting a +// zapcore.Field into a value which will be written as a github.com/go-kit/log +// keypair. +type fieldEncoder struct { + fields []interface{} + namespace []string +} + +var _ zapcore.ObjectEncoder = (*fieldEncoder)(nil) + +var encPool = sync.Pool{ + New: func() any { + return &fieldEncoder{} + }, +} + +// newFieldEncoder creates a ready-to-use fieldEncoder. Call Close once the +// fieldEncoder is no longer needed. +func newFieldEncoder() *fieldEncoder { + fe := encPool.Get().(*fieldEncoder) + fe.fields = fe.fields[:0] + fe.namespace = fe.namespace[:0] + return fe +} + +func (fe *fieldEncoder) Close() error { + encPool.Put(fe) + return nil +} + +func (fe *fieldEncoder) AddArray(key string, marshaler zapcore.ArrayMarshaler) error { + // TODO(rfratto): allow this to write the value of the array instead of + // placeholder text. + fe.fields = append(fe.fields, fe.keyName(key), "") + return nil +} + +func (fe *fieldEncoder) AddObject(key string, marshaler zapcore.ObjectMarshaler) error { + // TODO(rfratto): allow this to write the value of the object instead of + // placeholder text. + fe.fields = append(fe.fields, fe.keyName(key), "") + return nil +} + +func (fe *fieldEncoder) AddBinary(key string, value []byte) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddByteString(key string, value []byte) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddBool(key string, value bool) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddComplex128(key string, value complex128) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddComplex64(key string, value complex64) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddDuration(key string, value time.Duration) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddFloat64(key string, value float64) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddFloat32(key string, value float32) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddInt(key string, value int) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddInt64(key string, value int64) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddInt32(key string, value int32) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddInt16(key string, value int16) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddInt8(key string, value int8) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddString(key, value string) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddTime(key string, value time.Time) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddUint(key string, value uint) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddUint64(key string, value uint64) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddUint32(key string, value uint32) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddUint16(key string, value uint16) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddUint8(key string, value uint8) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddUintptr(key string, value uintptr) { + fe.fields = append(fe.fields, fe.keyName(key), value) +} + +func (fe *fieldEncoder) AddReflected(key string, value interface{}) error { + fe.fields = append(fe.fields, fe.keyName(key), value) + return nil +} + +func (fe *fieldEncoder) OpenNamespace(key string) { + fe.namespace = append(fe.namespace, key) +} + +// keyName returns the key to used for a named field. If the fieldEncoder isn't +// namespaced, then the key name is k. Otherwise, the key name the combined +// string of the namespace and key, delimiting each fragment by a period `.`. +func (fe *fieldEncoder) keyName(k string) interface{} { + if len(fe.namespace) == 0 { + return k + } + return key(append(fe.namespace, k)) +} + +type key []string + +var _ fmt.Stringer = (key)(nil) + +func (k key) String() string { + if len(k) == 1 { + return k[0] + } + return strings.Join(k, ".") +} diff --git a/component/otelcol/internal/zapadapter/zapadapter_test.go b/component/otelcol/internal/zapadapter/zapadapter_test.go new file mode 100644 index 000000000000..dbd725fda5a5 --- /dev/null +++ b/component/otelcol/internal/zapadapter/zapadapter_test.go @@ -0,0 +1,129 @@ +package zapadapter_test + +import ( + "bytes" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/agent/component/otelcol/internal/zapadapter" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func Test(t *testing.T) { + tt := []struct { + name string + field []zap.Field + expect string + }{ + { + name: "No fields", + expect: `level=info msg="Hello, world!"`, + }, + { + name: "Any", + field: []zap.Field{zap.Any("key", 12345)}, + expect: `level=info msg="Hello, world!" key=12345`, + }, + { + name: "Bool", + field: []zap.Field{zap.Bool("key", true)}, + expect: `level=info msg="Hello, world!" key=true`, + }, + { + name: "Duration", + field: []zap.Field{zap.Duration("key", time.Minute)}, + expect: `level=info msg="Hello, world!" key=1m0s`, + }, + { + name: "Error", + field: []zap.Field{zap.Error(fmt.Errorf("something went wrong"))}, + expect: `level=info msg="Hello, world!" error="something went wrong"`, + }, + { + name: "Float32", + field: []zap.Field{zap.Float32("key", 123.45)}, + expect: `level=info msg="Hello, world!" key=123.45`, + }, + { + name: "Float64", + field: []zap.Field{zap.Float64("key", 123.45)}, + expect: `level=info msg="Hello, world!" key=123.45`, + }, + { + name: "Int", + field: []zap.Field{zap.Int("key", 12345)}, + expect: `level=info msg="Hello, world!" key=12345`, + }, + { + name: "String", + field: []zap.Field{zap.String("key", "foobar")}, + expect: `level=info msg="Hello, world!" key=foobar`, + }, + { + name: "Time", + field: []zap.Field{ + zap.Time("key", time.Date(2022, 12, 1, 1, 1, 1, 1, time.UTC)), + }, + expect: `level=info msg="Hello, world!" key=2022-12-01T01:01:01.000000001Z`, + }, + { + name: "Namespace", + field: []zap.Field{ + zap.String("key", "foo"), + zap.Namespace("ns"), + zap.String("key", "bar"), + }, + expect: `level=info msg="Hello, world!" key=foo ns.key=bar`, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + + inner := log.NewLogfmtLogger(log.NewSyncWriter(&buf)) + + zapLogger := zapadapter.New(inner) + zapLogger.Info("Hello, world!", tc.field...) + + require.Equal(t, tc.expect, strings.TrimSpace(buf.String())) + }) + } +} + +func Benchmark(b *testing.B) { + // Benchmark various fields that may be commonly printed. + // + // NOTE(rfratto): Array and Object are skipped as bencmharks since they + // currently only print placeholder text in place of actual values. + + runBenchmark(b, "No fields") + runBenchmark(b, "Any", zap.Any("key", 12345)) + runBenchmark(b, "Bool", zap.Bool("key", true)) + runBenchmark(b, "Duration", zap.Duration("key", time.Second)) + runBenchmark(b, "Error", zap.Error(fmt.Errorf("hello"))) + runBenchmark(b, "Float32", zap.Float32("key", 1234)) + runBenchmark(b, "Float64", zap.Float64("key", 1234)) + runBenchmark(b, "Int", zap.Int("key", 1234)) + runBenchmark(b, "String", zap.String("key", "test")) + runBenchmark(b, "Time", zap.Time("key", time.Date(2022, 12, 1, 1, 1, 1, 1, time.UTC))) +} + +func runBenchmark(b *testing.B, name string, fields ...zap.Field) { + innerLogger := log.NewLogfmtLogger(io.Discard) + innerLogger = level.NewFilter(innerLogger, level.AllowAll()) + + zapLogger := zapadapter.New(innerLogger) + + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + zapLogger.Info("Hello, world!", fields...) + } + }) +} diff --git a/component/otelcol/receiver/receiver.go b/component/otelcol/receiver/receiver.go index 0f0a5fe7046e..8e3d7df8ae6c 100644 --- a/component/otelcol/receiver/receiver.go +++ b/component/otelcol/receiver/receiver.go @@ -11,12 +11,12 @@ import ( "github.com/grafana/agent/component/otelcol" "github.com/grafana/agent/component/otelcol/internal/fanoutconsumer" "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/component/otelcol/internal/zapadapter" "github.com/grafana/agent/pkg/build" otelcomponent "go.opentelemetry.io/collector/component" otelconfig "go.opentelemetry.io/collector/config" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) // Arguments is an extension of component.Arguments which contains necessary @@ -102,8 +102,7 @@ func (r *Receiver) Update(args component.Arguments) error { settings := otelcomponent.ReceiverCreateSettings{ TelemetrySettings: otelcomponent.TelemetrySettings{ - // TODO(rfratto): create an adapter from zap -> go-kit/log - Logger: zap.NewNop(), + Logger: zapadapter.New(r.opts.Logger), // TODO(rfratto): expose tracing and logging statistics. // From 4d55a3ef1e8af4c5e7f2c892bf48acf9499685a6 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 3 Oct 2022 16:33:02 -0400 Subject: [PATCH 2/3] zapadapter: add more comments explaining the purpose of the fieldEncoder --- component/otelcol/internal/zapadapter/zapadapter.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/component/otelcol/internal/zapadapter/zapadapter.go b/component/otelcol/internal/zapadapter/zapadapter.go index e6179afad0cc..8c113c516f0d 100644 --- a/component/otelcol/internal/zapadapter/zapadapter.go +++ b/component/otelcol/internal/zapadapter/zapadapter.go @@ -85,7 +85,16 @@ func (lc *loggerCore) Sync() error { // zapcore.Field into a value which will be written as a github.com/go-kit/log // keypair. type fieldEncoder struct { - fields []interface{} + // fields are the list of fields that will be passed to log.Logger.Log. + fields []interface{} + + // namespace is used to prefix keys before appending to fields. When a + // zap.Namespace field is logged, the OpenNamespace method of the + // fieldEncoder will be invoked, appending to the namespace slice. + // + // It is not possible to pop a namespace from the list; once a zap.Namespace + // field is logged, all further fields in that entry are scoped within that + // namespace. namespace []string } From 7662d8ce06ab510e0214ff63a95c976ac908e1ab Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 4 Oct 2022 08:09:08 -0400 Subject: [PATCH 3/3] zapadapter: document a little better --- component/otelcol/internal/zapadapter/zapadapter.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/component/otelcol/internal/zapadapter/zapadapter.go b/component/otelcol/internal/zapadapter/zapadapter.go index 8c113c516f0d..e3e06f0447ed 100644 --- a/component/otelcol/internal/zapadapter/zapadapter.go +++ b/component/otelcol/internal/zapadapter/zapadapter.go @@ -28,13 +28,19 @@ type loggerCore struct { var _ zapcore.Core = (*loggerCore)(nil) +// Enabled implements zapcore.Core and returns whether logs at a specific level +// should be reported. func (lc *loggerCore) Enabled(zapcore.Level) bool { // An instance of log.Logger has no way of knowing if logs will be filtered // out, so we always have to return true here. return true } +// With implements zapcore.Core, returning a new logger core with ff appended +// to the list of fields. func (lc *loggerCore) With(ff []zapcore.Field) zapcore.Core { + // Encode all of the fields so that they're go-kit compatible and create a + // new logger from it. enc := newFieldEncoder() defer func() { _ = enc.Close() }() @@ -47,10 +53,14 @@ func (lc *loggerCore) With(ff []zapcore.Field) zapcore.Core { } } +// Check implements zapcore.Core. lc will always add itself along with the +// provided entry to the CheckedEntry. func (lc *loggerCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { return ce.AddCore(e, lc) } +// Write serializes e with the provided list of fields, writing them to the +// underlying github.com/go-kit/log.Logger instance. func (lc *loggerCore) Write(e zapcore.Entry, ff []zapcore.Field) error { enc := newFieldEncoder() defer func() { _ = enc.Close() }() @@ -139,7 +149,7 @@ func (fe *fieldEncoder) AddBinary(key string, value []byte) { } func (fe *fieldEncoder) AddByteString(key string, value []byte) { - fe.fields = append(fe.fields, fe.keyName(key), value) + fe.fields = append(fe.fields, fe.keyName(key), string(value)) } func (fe *fieldEncoder) AddBool(key string, value bool) {