From 06d762fd54dda568e6d937ea22603b0ff589f9b3 Mon Sep 17 00:00:00 2001 From: Alex Barganier Date: Wed, 3 Apr 2024 17:02:22 -0400 Subject: [PATCH] pkg/util/log: introduce log.Structured, wire up to aggregator This patch introduces log.Structured to the log package API. It aims to serve as a prototype for the log facility we will use for exporting "exhaust" from CRDB in the form of JSON objects. The intention is that this exhaust can be consumed externally and be sufficient enough to build features around. This iteration has some serious limitations, the main one being that it is not redaction-aware. The `log.StructuredEvent` exists alongside it for now. Both implementations are quite similar, so they should probably be reconciled and/or combined, but this is left as a TODO to avoid slowing down the prototyping process. For now, it's sufficient for prototyping. The patch also introduces a new logging channel explicitly for the new `log.Structured` API, called `STRUCTURED_EVENTS`. The ability to segment these types of logs from the rest of the logs is what motivates this separate channel. The name works for now, but we should consider if there's a better name available. A following patch will focus on internal consumption of these events. Release note: none --- docs/generated/logging.md | 5 + pkg/cli/testdata/logflags | 39 ++- pkg/obs/eventagg/BUILD.bazel | 5 + pkg/obs/eventagg/doc.go | 10 +- pkg/obs/eventagg/flush.go | 32 +- pkg/obs/eventagg/flush_consumer.go | 73 ++--- pkg/obs/eventagg/flush_consumer_test.go | 155 +++++++++ pkg/obs/eventagg/flush_test.go | 16 +- pkg/obs/eventagg/map_reduce.go | 10 +- pkg/obs/eventagg/map_reduce_test.go | 6 +- pkg/sql/BUILD.bazel | 1 + pkg/sql/conn_executor.go | 4 + pkg/sql/executor_statement_metrics.go | 5 + pkg/sql/sqlstats/BUILD.bazel | 2 +- pkg/sql/sqlstats/aggregate.go | 20 +- pkg/util/log/BUILD.bazel | 1 + pkg/util/log/channel/channel_generated.go | 4 + .../eventpbgen/log_channels_generated.go | 1 + pkg/util/log/log_channels_generated.go | 297 ++++++++++++++++++ pkg/util/log/logconfig/testdata/export | 4 +- pkg/util/log/logconfig/testdata/validate | 10 +- pkg/util/log/logconfig/testdata/yaml | 4 +- pkg/util/log/logpb/log.proto | 6 +- pkg/util/log/structured_v2.go | 81 +++++ pkg/util/log/testdata/config | 2 +- 25 files changed, 705 insertions(+), 88 deletions(-) create mode 100644 pkg/obs/eventagg/flush_consumer_test.go create mode 100644 pkg/util/log/structured_v2.go diff --git a/docs/generated/logging.md b/docs/generated/logging.md index 8b628dc2dd96..c532585a190d 100644 --- a/docs/generated/logging.md +++ b/docs/generated/logging.md @@ -177,3 +177,8 @@ The `KV_DISTRIBUTION` channel is used to report data distribution events, such a replicas between stores in the cluster, or adding (removing) replicas to ranges. +### `STRUCTURED_EVENTS` + +The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +JSON, which can be consumed externally to power o11y features. + diff --git a/pkg/cli/testdata/logflags b/pkg/cli/testdata/logflags index 2d5a5742d744..261d73266cbf 100644 --- a/pkg/cli/testdata/logflags +++ b/pkg/cli/testdata/logflags @@ -28,7 +28,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, @@ -64,7 +65,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, @@ -165,7 +167,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],/pathA/logs,true,crdb-v2)>, health: , kv-distribution: , pebble: , @@ -203,7 +206,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],/mypath,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],/mypath,true,crdb-v2)>, health: , kv-distribution: , pebble: , @@ -242,7 +246,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],/pathA/logs,true,crdb-v2)>, health: , kv-distribution: , pebble: , @@ -286,7 +291,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],/mypath,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],/mypath,true,crdb-v2)>, health: , kv-distribution: , pebble: , @@ -323,7 +329,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, @@ -361,7 +368,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, @@ -409,7 +417,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, @@ -479,7 +488,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],/mypath,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],/mypath,true,crdb-v2)>, health: , kv-distribution: , pebble: , @@ -518,7 +528,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],/pathA,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],/pathA,true,crdb-v2)>, health: , kv-distribution: , pebble: , @@ -576,7 +587,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, @@ -613,7 +625,8 @@ SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, -KV_DISTRIBUTION],,true,crdb-v2)>, +KV_DISTRIBUTION, +STRUCTURED_EVENTS],,true,crdb-v2)>, health: ,true,crdb-v2)>, kv-distribution: ,true,crdb-v2)>, pebble: ,true,crdb-v2)>, diff --git a/pkg/obs/eventagg/BUILD.bazel b/pkg/obs/eventagg/BUILD.bazel index 7cce72b04e2b..d570a07998a6 100644 --- a/pkg/obs/eventagg/BUILD.bazel +++ b/pkg/obs/eventagg/BUILD.bazel @@ -13,6 +13,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/envutil", + "//pkg/util/log", "//pkg/util/syncutil", ], ) @@ -20,18 +21,22 @@ go_library( go_test( name = "eventagg_test", srcs = [ + "flush_consumer_test.go", "flush_test.go", "map_reduce_test.go", ], data = glob(["testdata/**"]), embed = [":eventagg"], deps = [ + "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/log/logpb", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/obs/eventagg/doc.go b/pkg/obs/eventagg/doc.go index c88cb2ce9569..42bb4ccaed65 100644 --- a/pkg/obs/eventagg/doc.go +++ b/pkg/obs/eventagg/doc.go @@ -66,15 +66,9 @@ package eventagg // challenges that we're punting until later that are worth enumerating here. // // Unsolved Challenges: -// 1. What do the details of the structured logging API look like that the eventagg package will use? -// Current telemetry logging requires protobuf messages which can be unwieldy and require us to either -// use protobuf types within the eventagg system (undesirable) or transform aggregation results into -// a protobuf type (duplicative). Some early ideas on how to solve this are discussed in comments within -// pkg/obs/eventagg/flush_consumer.go, within (*LogWriteConsumer[Agg]).onFlush(). For now, we punt this -// problem until the broader eventagg interfaces are agreed upon. -// 2. Memory accounting as well as observability into the aggregations themselves are required to make +// 1. Memory accounting as well as observability into the aggregations themselves are required to make // eventagg safe to use and easy to debug. For now, we punt this problem. -// 3. What kinds of concurrency, if any, should be used within the eventagg package? If the goal is for +// 2. What kinds of concurrency, if any, should be used within the eventagg package? If the goal is for // eventagg to be easy for developers to use, then making a Map/Reduce operation concurrent is // should also be easy if there's a need for it. We punt whether this is a requirement, and the details // of the problem if it *is* a requirement, until later. diff --git a/pkg/obs/eventagg/flush.go b/pkg/obs/eventagg/flush.go index 041a0ef5bd55..75b2b2d16a03 100644 --- a/pkg/obs/eventagg/flush.go +++ b/pkg/obs/eventagg/flush.go @@ -12,6 +12,25 @@ package eventagg import "time" +// FlushKind is the type of FlushTrigger used, which communicates aggregation strategy. +type FlushKind string + +// Windowed is a FlushTrigger type that enforces windowed flush periods, containing a +// window start and end time. +const Windowed FlushKind = "WINDOWED" + +// AggInfo is a type used to communicate metadata from a FlushTrigger to the +// targets of the flush about the aggregation type period. For example, the type +// of flush, and associated timestamps. +type AggInfo struct { + // Kind is the FlushKind used in the FlushTrigger. + Kind FlushKind `json:"kind"` + // StartTime is the primary timestamp for the flushed data. + StartTime int64 `json:"start_time"` + // EndTime, if present, represents the end timestamp of the flush interval. + EndTime int64 `json:"end_time"` +} + // FlushTrigger defines the interface used by aggregators, such as MapReduceAggregator, // to determine when a flush of the current aggregation should be triggered. // @@ -19,7 +38,7 @@ import "time" // but *before* the event itself is aggregated. type FlushTrigger interface { // shouldFlush returns true if this FlushTrigger has been tripped. - shouldFlush() bool + shouldFlush() (bool, AggInfo) } // WindowedFlush is a FlushTrigger which triggers flushes on a wall clock aligned @@ -55,11 +74,16 @@ func (w *WindowedFlush) newWindowEnd() time.Time { return w.nowFn().Truncate(w.window).Add(w.window) } -func (w *WindowedFlush) shouldFlush() bool { +func (w *WindowedFlush) shouldFlush() (bool, AggInfo) { t := w.nowFn() if t.Equal(w.curWindowEnd) || t.After(w.curWindowEnd) { + meta := AggInfo{ + Kind: Windowed, + StartTime: w.curWindowEnd.Add(-w.window).UnixNano(), + EndTime: w.curWindowEnd.UnixNano(), + } w.curWindowEnd = w.newWindowEnd() - return true + return true, meta } - return false + return false, AggInfo{} } diff --git a/pkg/obs/eventagg/flush_consumer.go b/pkg/obs/eventagg/flush_consumer.go index 74b665da7779..851afd10802a 100644 --- a/pkg/obs/eventagg/flush_consumer.go +++ b/pkg/obs/eventagg/flush_consumer.go @@ -10,50 +10,51 @@ package eventagg -import "context" +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/log" +) // flushConsumer is the interface type used to define a post-processing consumer of // the aggregations flushed by a MapReduceAggregator. type flushConsumer[K comparable, V any] interface { - // TODO(abarganier): It'd be better if we didn't pass down the reduceContainer and instead - // provided the unwrapped values, but for now I'd rather copying over to a new map. - onFlush(ctx context.Context, m map[K]V) + onFlush(ctx context.Context, aggInfo AggInfo, m map[K]V) } // LogWriteConsumer is an example flushConsumer, which provides an easy plug-and-play -// method of logging all the values Agg flushed by a MapReduceAggregator[Agg]. -type LogWriteConsumer[K comparable, V any] struct{} +// method of logging all the values V flushed by a MapReduceAggregator[V]. +type LogWriteConsumer[K comparable, V any] struct { + EventType log.EventType +} -// NewLogWriteConsumer returns a new *LogWriteConsumer[Agg] instance. -func NewLogWriteConsumer[K comparable, V any]() *LogWriteConsumer[K, V] { - return &LogWriteConsumer[K, V]{} +// KeyValueLog is the type logged to log.Structured via the LogWriteConsumer. +// It wraps the flush metadata, key, and value flushed from the associated +// aggregator. +type KeyValueLog struct { + AggInfo AggInfo `json:"agg_info"` + Key any `json:"key"` + Value any `json:"value"` } -// TODO(abarganier): The values flushed out of a MapReduceAggregator ideally should be able -// to be provided to log.StructuredEvent(), or a similar facility, without transformation. -// -// For now, we punt this problem until we have the lower level interfaces fleshed out. Currently, -// log.StructuredEvent() requires a protobuf format, which may be cumbersome & something we'd like -// to avoid if possible within this system. -// -// One option would be to expand the API of pkg/util/log to enable direct JSON logging, e.g.: -// -// log.Structured(ctx, eventMetadata, eventJSON) -// -// This would allow us to construct a single metadata object that can be applied to every flushed -// value Agg. -func (l LogWriteConsumer[K, V]) onFlush(ctx context.Context, m map[K]V) { - //metadata := &logmeta.EventMetadata{ - // EventType: ..., - // EventTimestamp: timeutil.Now().UnixNano(), - // NodeDetails: logmeta.GetNodeDetails(ctx), - // ..., - //} - //for _, v := range m { - // marshalled, err := json.Marshal(v) - // if err != nil { - // log.Errorf(ctx, "failed to marshal JSON for event: %v", v) - // } - // log.Structured(ctx, metadata, marshalled) - //} +// NewLogWriteConsumer returns a new *LogWriteConsumer[K, V] instance. +func NewLogWriteConsumer[K comparable, V any](eventType log.EventType) *LogWriteConsumer[K, V] { + return &LogWriteConsumer[K, V]{ + EventType: eventType, + } +} + +// onFlush emits all events in the provided map as logs via log.Structured +func (l LogWriteConsumer[K, V]) onFlush(ctx context.Context, aggInfo AggInfo, m map[K]V) { + metadata := log.StructuredMeta{ + EventType: l.EventType, + } + for k, v := range m { + KVLog := KeyValueLog{ + AggInfo: aggInfo, + Key: k, + Value: v, + } + log.Structured(ctx, metadata, KVLog) + } } diff --git a/pkg/obs/eventagg/flush_consumer_test.go b/pkg/obs/eventagg/flush_consumer_test.go new file mode 100644 index 000000000000..5981219a5b0e --- /dev/null +++ b/pkg/obs/eventagg/flush_consumer_test.go @@ -0,0 +1,155 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package eventagg + +import ( + "context" + "encoding/json" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestLogWriteConsumer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.ScopeWithoutShowLogs(t).Close(t) + ctx := context.Background() + + testEventType := log.EventType("test") + interceptor := NewEventAggKVLogInterceptor(t) + interceptor.SetEventTypeFilter(testEventType) + defer log.InterceptWith(ctx, interceptor)() + + consumer := NewLogWriteConsumer[string, string]("test") + aggInfo := AggInfo{ + Kind: Windowed, + StartTime: 100, + EndTime: 200, + } + flushed := map[string]string{ + "hello": "world", + "good": "morning", + } + consumer.onFlush(ctx, aggInfo, flushed) + + testutils.SucceedsSoon(t, func() error { + if len(flushed) != interceptor.Count() { + return errors.New("still waiting for all expected logs to be intercepted") + } + return nil + }) + logs := interceptor.Logs() + metas := interceptor.LogMetas() + for i, l := range logs { + expectedV, ok := flushed[l.Key.(string)] + require.True(t, ok) + require.Equal(t, expectedV, l.Value.(string)) + require.Equal(t, aggInfo, l.AggInfo) + require.Equal(t, log.StructuredMeta{EventType: testEventType}, metas[i]) + } +} + +type KVStructuredLogInterceptor struct { + testState *testing.T + + typeFilter log.EventType + mu struct { + syncutil.Mutex + // meta[i] is the metadata corresponding to logs[i] + logs []KeyValueLog + meta []log.StructuredMeta + } +} + +var _ log.Interceptor = (*KVStructuredLogInterceptor)(nil) + +func NewEventAggKVLogInterceptor(t *testing.T) *KVStructuredLogInterceptor { + out := &KVStructuredLogInterceptor{ + testState: t, + } + out.mu.logs = make([]KeyValueLog, 0) + out.mu.meta = make([]log.StructuredMeta, 0) + return out +} + +func (e *KVStructuredLogInterceptor) Intercept(entry []byte) { + var logEntry logpb.Entry + + if err := json.Unmarshal(entry, &logEntry); err != nil { + e.testState.Fatal(err) + } + + if logEntry.Channel != logpb.Channel_STRUCTURED_EVENTS { + return + } + + // The log.Interceptor uses legacy log formats, which for some reason inserts + // this string in front of the JSON structured, making it un-parsable unless we strip + // this prefix. + structuredEntryPrefix := "Structured entry: " + var structured log.StructuredPayload + if err := json.Unmarshal([]byte(strings.TrimPrefix(logEntry.Message, structuredEntryPrefix)), &structured); err != nil { + e.testState.Fatal(err) + } + + if e.typeFilter != "" && e.typeFilter != structured.Metadata.EventType { + return + } + + // Given that the KeyValueLog has fields with type `any`, we need to re-encode the structured + // to JSON so that we can deserialize once again, this time into the more specific type. + // The original deserialization treated structured.Payload as a map[string]interface{}, which is + // clumsy to work with. + kvLogBytes, err := json.Marshal(structured.Payload) + if err != nil { + e.testState.Fatal(err) + } + + var kvLog KeyValueLog + err = json.Unmarshal(kvLogBytes, &kvLog) + if err != nil { + e.testState.Fatal(err) + } + + e.mu.Lock() + defer e.mu.Unlock() + e.mu.logs = append(e.mu.logs, kvLog) + e.mu.meta = append(e.mu.meta, structured.Metadata) +} + +func (e *KVStructuredLogInterceptor) Logs() []KeyValueLog { + e.mu.Lock() + defer e.mu.Unlock() + return e.mu.logs +} + +func (e *KVStructuredLogInterceptor) LogMetas() []log.StructuredMeta { + e.mu.Lock() + defer e.mu.Unlock() + return e.mu.meta +} + +func (e *KVStructuredLogInterceptor) Count() int { + e.mu.Lock() + defer e.mu.Unlock() + return len(e.mu.logs) +} + +func (e *KVStructuredLogInterceptor) SetEventTypeFilter(logType log.EventType) { + e.typeFilter = logType +} diff --git a/pkg/obs/eventagg/flush_test.go b/pkg/obs/eventagg/flush_test.go index 846db1cf2099..93c19c3f5054 100644 --- a/pkg/obs/eventagg/flush_test.go +++ b/pkg/obs/eventagg/flush_test.go @@ -29,12 +29,22 @@ func TestWindowedFlush(t *testing.T) { return now }) // Initially, we should be within the current window. - require.False(t, flush.shouldFlush()) + shouldFlush, meta := flush.shouldFlush() + require.False(t, shouldFlush) + require.Equal(t, AggInfo{}, meta) // If we fast-forward to the beginning of the next window, we expect // a flush to be triggered. now = now.Add(window).Truncate(window) - require.True(t, flush.shouldFlush()) + shouldFlush, meta = flush.shouldFlush() + require.True(t, shouldFlush) + require.Equal(t, AggInfo{ + Kind: Windowed, + StartTime: now.Add(-window).Truncate(window).UnixNano(), + EndTime: now.UnixNano(), + }, meta) // Times occurring before the current window's end should not trigger a flush. now = now.Add(1 * time.Minute) - require.False(t, flush.shouldFlush()) + shouldFlush, meta = flush.shouldFlush() + require.False(t, shouldFlush) + require.Equal(t, AggInfo{}, meta) } diff --git a/pkg/obs/eventagg/map_reduce.go b/pkg/obs/eventagg/map_reduce.go index ca66a46406fe..108a0f8afc1c 100644 --- a/pkg/obs/eventagg/map_reduce.go +++ b/pkg/obs/eventagg/map_reduce.go @@ -59,8 +59,8 @@ func (m *MapReduceAggregator[E, K, Agg]) Add(ctx context.Context, e E) { // If it's time to flush, do so async before processing the event. // This will reset the cache, meaning our event will be added to // a fresh aggregation window. - if m.mu.flushTrigger.shouldFlush() { - m.flushAsync(ctx, m.getAndResetCacheLocked()) + if shouldFlush, aggInfo := m.mu.flushTrigger.shouldFlush(); shouldFlush { + m.flushAsync(ctx, aggInfo, m.getAndResetCacheLocked()) } v, ok := m.mu.cache[k] if !ok { @@ -75,12 +75,14 @@ func (m *MapReduceAggregator[E, K, Agg]) Add(ctx context.Context, e E) { // // The flushed data will be passed to each of the configured flushConsumer's // provided at construction for further processing. -func (m *MapReduceAggregator[E, K, Agg]) flushAsync(ctx context.Context, flushed map[K]Agg) { +func (m *MapReduceAggregator[E, K, Agg]) flushAsync( + ctx context.Context, aggInfo AggInfo, flushed map[K]Agg, +) { // TODO(abarganier): We should probably use a stopper async task here. // TODO(abarganier): Should we make whether this is done async configurable? go func() { for _, c := range m.consumers { - c.onFlush(ctx, flushed) + c.onFlush(ctx, aggInfo, flushed) } }() } diff --git a/pkg/obs/eventagg/map_reduce_test.go b/pkg/obs/eventagg/map_reduce_test.go index f9f75590ad93..7a45fc34d8b5 100644 --- a/pkg/obs/eventagg/map_reduce_test.go +++ b/pkg/obs/eventagg/map_reduce_test.go @@ -239,7 +239,7 @@ func (t *testFlushConsumer[K, V]) consumedCh() <-chan struct{} { } // onFlush implements the flushConsumer interface. -func (t *testFlushConsumer[K, V]) onFlush(_ context.Context, flushed map[K]V) { +func (t *testFlushConsumer[K, V]) onFlush(_ context.Context, _ AggInfo, flushed map[K]V) { t.flushed = flushed t.consumed <- struct{}{} } @@ -283,10 +283,10 @@ type testFlushTrigger struct { var _ FlushTrigger = (*testFlushTrigger)(nil) // shouldFlush implements the FlushTrigger interface. -func (t *testFlushTrigger) shouldFlush() bool { +func (t *testFlushTrigger) shouldFlush() (bool, AggInfo) { t.mu.Lock() defer t.mu.Unlock() - return t.mu.flush + return t.mu.flush, AggInfo{} } func (t *testFlushTrigger) setShouldFlush(to bool) { diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index eb686ece9104..aa041b93563d 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -338,6 +338,7 @@ go_library( "//pkg/multitenant/tenantcapabilities", "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb", "//pkg/obs", + "//pkg/obs/eventagg", "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", "//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index e1ea2b608e29..da8d50b62db1 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu" + "github.com/cockroachdb/cockroach/pkg/obs/eventagg" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -322,6 +323,8 @@ type Server struct { // node. Newly collected statistics flow into sqlStats. sqlStats *persistedsqlstats.PersistedSQLStats + sqlStatsAggregator *eventagg.MapReduceAggregator[*sqlstats.Stmt, appstatspb.StmtFingerprintID, *sqlstats.StmtStatistics] + // sqlStatsController is the control-plane interface for sqlStats. sqlStatsController *persistedsqlstats.Controller @@ -496,6 +499,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { }, memSQLStats) s.sqlStats = persistedSQLStats + s.sqlStatsAggregator = sqlstats.NewStmtStatsAggregator() s.sqlStatsController = persistedSQLStats.GetController(cfg.SQLStatusServer) schemaTelemetryIEMonitor := MakeInternalExecutorMemMonitor(MemoryMetrics{}, s.GetExecutorConfig().Settings) schemaTelemetryIEMonitor.StartNoReserved(context.Background(), s.GetBytesMonitor()) diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index b8abbe09a226..63a80991d9dc 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -221,6 +221,11 @@ func (ex *connExecutor) recordStatementSummary( stmtFingerprintID, err := ex.statsCollector.RecordStatement(ctx, recordedStmtStatsKey, recordedStmtStats) + ex.server.sqlStatsAggregator.Add(ctx, &sqlstats.Stmt{ + StmtFingerprintID: stmtFingerprintID, + Statement: stmt.StmtNoConstants, + ServiceLatency: svcLatRaw, + }) if err != nil { if log.V(1) { diff --git a/pkg/sql/sqlstats/BUILD.bazel b/pkg/sql/sqlstats/BUILD.bazel index d63826ec49e1..011639b264b8 100644 --- a/pkg/sql/sqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/BUILD.bazel @@ -20,10 +20,10 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlstats/insights", + "//pkg/util/log", "//pkg/util/metric", "//pkg/util/stop", "//pkg/util/timeutil", "//pkg/util/uuid", - "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/sql/sqlstats/aggregate.go b/pkg/sql/sqlstats/aggregate.go index 6a1f39eb413a..de537dd37d9d 100644 --- a/pkg/sql/sqlstats/aggregate.go +++ b/pkg/sql/sqlstats/aggregate.go @@ -15,9 +15,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/obs/eventagg" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/redact" ) type Stmt struct { @@ -28,7 +28,7 @@ type Stmt struct { // problem to avoid slowing down the prototyping process. StmtFingerprintID appstatspb.StmtFingerprintID // Statement is the redactable statement string. - Statement redact.RedactableString + Statement string // ServiceLatency is the latency of serving the query, excluding miscellaneous // sources of the overhead (e.g. internal retries). ServiceLatency time.Duration @@ -53,7 +53,7 @@ type StmtStatistics struct { ExecCount int // ServiceLatency is a histogram used to aggregate service latencies of the // various Stmt's recorded into this StmtStatistics instance. - ServiceLatency metric.ManualWindowHistogram + ServiceLatency *metric.ManualWindowHistogram } // NewStmtStatsAggregator leverages the generic MapReduceAggregator to instantiate @@ -65,10 +65,18 @@ func NewStmtStatsAggregator() *eventagg.MapReduceAggregator[*Stmt, appstatspb.St return eventagg.NewMapReduceAggregator[*Stmt, appstatspb.StmtFingerprintID, *StmtStatistics]( func() *StmtStatistics { return &StmtStatistics{ - ServiceLatency: metric.ManualWindowHistogram{}, // TODO: legitimate construction of histogram. + ServiceLatency: metric.NewManualWindowHistogram( + metric.Metadata{ + Name: "stmt.svc.latency", + Measurement: "Aggregate service latency of statement executions", + Unit: metric.Unit_NANOSECONDS, + }, + metric.IOLatencyBuckets.GetBucketsFromBucketConfig(), + true, + ), } }, - eventagg.NewWindowedFlush(10*time.Minute, timeutil.Now), - eventagg.NewLogWriteConsumer[appstatspb.StmtFingerprintID, *StmtStatistics](), // We'd like to log all the aggregated results, as-is. + eventagg.NewWindowedFlush(10*time.Second, timeutil.Now), + eventagg.NewLogWriteConsumer[appstatspb.StmtFingerprintID, *StmtStatistics](log.STATEMENT_STATS), // We'd like to log all the aggregated results, as-is. ) } diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index dc8948b4cf17..0d5bfe9ce9c9 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "stderr_redirect_windows.go", "stderr_sink.go", "structured.go", + "structured_v2.go", "test_log_scope.go", "trace.go", "tracebacks.go", diff --git a/pkg/util/log/channel/channel_generated.go b/pkg/util/log/channel/channel_generated.go index e72adc05e8f8..b87173bf0573 100644 --- a/pkg/util/log/channel/channel_generated.go +++ b/pkg/util/log/channel/channel_generated.go @@ -141,3 +141,7 @@ const TELEMETRY = logpb.Channel_TELEMETRY // replicas between stores in the cluster, or adding (removing) replicas to // ranges. const KV_DISTRIBUTION = logpb.Channel_KV_DISTRIBUTION + +// STRUCTURED_EVENTS is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +const STRUCTURED_EVENTS = logpb.Channel_STRUCTURED_EVENTS diff --git a/pkg/util/log/eventpb/eventpbgen/log_channels_generated.go b/pkg/util/log/eventpb/eventpbgen/log_channels_generated.go index e2d14acf12f9..b37703d8a66c 100644 --- a/pkg/util/log/eventpb/eventpbgen/log_channels_generated.go +++ b/pkg/util/log/eventpb/eventpbgen/log_channels_generated.go @@ -17,5 +17,6 @@ var channels = map[string]struct{}{ "SQL_INTERNAL_PERF": {}, "TELEMETRY": {}, "KV_DISTRIBUTION": {}, + "STRUCTURED_EVENTS": {}, "CHANNEL_MAX": {}, } diff --git a/pkg/util/log/log_channels_generated.go b/pkg/util/log/log_channels_generated.go index 88424883d83d..82f5cc8b8719 100644 --- a/pkg/util/log/log_channels_generated.go +++ b/pkg/util/log/log_channels_generated.go @@ -6513,3 +6513,300 @@ func (loggerKvDistribution) VEventf(ctx context.Context, level Level, format str func (loggerKvDistribution) VEventfDepth(ctx context.Context, depth int, level Level, format string, args ...interface{}) { vEventf(ctx, false /* isErr */, 1+depth, level, channel.KV_DISTRIBUTION, format, args...) } + +// loggerStructuredEvents is the logger type for the STRUCTURED_EVENTS channel. +type loggerStructuredEvents struct{} + +// StructuredEvents is a logger that logs to the STRUCTURED_EVENTS channel. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +var StructuredEvents loggerStructuredEvents + +// StructuredEvents and loggerStructuredEvents implement ChannelLogger. +// +// We do not force use of ChannelLogger when instantiating the logger +// object above (e.g. by giving it the interface type), to ensure +// the calls to the API methods remain inlinable in the common case. +var _ ChannelLogger = StructuredEvents + +// Infof logs to the STRUCTURED_EVENTS channel with severity INFO. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `INFO` severity is used for informational messages that do not +// require action. +func (loggerStructuredEvents) Infof(ctx context.Context, format string, args ...interface{}) { + logfDepth(ctx, 1, severity.INFO, channel.STRUCTURED_EVENTS, format, args...) +} + +// VInfof logs to the STRUCTURED_EVENTS channel with severity INFO, +// if logging has been enabled for the source file where the call is +// performed at the provided verbosity level, via the vmodule setting. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `INFO` severity is used for informational messages that do not +// require action. +func (loggerStructuredEvents) VInfof(ctx context.Context, level Level, format string, args ...interface{}) { + if VDepth(level, 1) { + logfDepth(ctx, 1, severity.INFO, channel.STRUCTURED_EVENTS, format, args...) + } +} + +// Info logs to the STRUCTURED_EVENTS channel with severity INFO. +// It extracts log tags from the context and logs them along with the given +// message. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `INFO` severity is used for informational messages that do not +// require action. +func (loggerStructuredEvents) Info(ctx context.Context, msg string) { + logfDepth(ctx, 1, severity.INFO, channel.STRUCTURED_EVENTS, msg) +} + +// InfofDepth logs to the STRUCTURED_EVENTS channel with severity INFO, +// offsetting the caller's stack frame by 'depth'. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `INFO` severity is used for informational messages that do not +// require action. +func (loggerStructuredEvents) InfofDepth(ctx context.Context, depth int, format string, args ...interface{}) { + logfDepth(ctx, depth+1, severity.INFO, channel.STRUCTURED_EVENTS, format, args...) +} + +// Warningf logs to the STRUCTURED_EVENTS channel with severity WARNING. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `WARNING` severity is used for situations which may require special handling, +// where normal operation is expected to resume automatically. +func (loggerStructuredEvents) Warningf(ctx context.Context, format string, args ...interface{}) { + logfDepth(ctx, 1, severity.WARNING, channel.STRUCTURED_EVENTS, format, args...) +} + +// VWarningf logs to the STRUCTURED_EVENTS channel with severity WARNING, +// if logging has been enabled for the source file where the call is +// performed at the provided verbosity level, via the vmodule setting. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `WARNING` severity is used for situations which may require special handling, +// where normal operation is expected to resume automatically. +func (loggerStructuredEvents) VWarningf(ctx context.Context, level Level, format string, args ...interface{}) { + if VDepth(level, 1) { + logfDepth(ctx, 1, severity.WARNING, channel.STRUCTURED_EVENTS, format, args...) + } +} + +// Warning logs to the STRUCTURED_EVENTS channel with severity WARNING. +// It extracts log tags from the context and logs them along with the given +// message. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `WARNING` severity is used for situations which may require special handling, +// where normal operation is expected to resume automatically. +func (loggerStructuredEvents) Warning(ctx context.Context, msg string) { + logfDepth(ctx, 1, severity.WARNING, channel.STRUCTURED_EVENTS, msg) +} + +// WarningfDepth logs to the STRUCTURED_EVENTS channel with severity WARNING, +// offsetting the caller's stack frame by 'depth'. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `WARNING` severity is used for situations which may require special handling, +// where normal operation is expected to resume automatically. +func (loggerStructuredEvents) WarningfDepth(ctx context.Context, depth int, format string, args ...interface{}) { + logfDepth(ctx, depth+1, severity.WARNING, channel.STRUCTURED_EVENTS, format, args...) +} + +// Errorf logs to the STRUCTURED_EVENTS channel with severity ERROR. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `ERROR` severity is used for situations that require special handling, +// where normal operation could not proceed as expected. +// Other operations can continue mostly unaffected. +func (loggerStructuredEvents) Errorf(ctx context.Context, format string, args ...interface{}) { + logfDepth(ctx, 1, severity.ERROR, channel.STRUCTURED_EVENTS, format, args...) +} + +// VErrorf logs to the STRUCTURED_EVENTS channel with severity ERROR, +// if logging has been enabled for the source file where the call is +// performed at the provided verbosity level, via the vmodule setting. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `ERROR` severity is used for situations that require special handling, +// where normal operation could not proceed as expected. +// Other operations can continue mostly unaffected. +func (loggerStructuredEvents) VErrorf(ctx context.Context, level Level, format string, args ...interface{}) { + if VDepth(level, 1) { + logfDepth(ctx, 1, severity.ERROR, channel.STRUCTURED_EVENTS, format, args...) + } +} + +// Error logs to the STRUCTURED_EVENTS channel with severity ERROR. +// It extracts log tags from the context and logs them along with the given +// message. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `ERROR` severity is used for situations that require special handling, +// where normal operation could not proceed as expected. +// Other operations can continue mostly unaffected. +func (loggerStructuredEvents) Error(ctx context.Context, msg string) { + logfDepth(ctx, 1, severity.ERROR, channel.STRUCTURED_EVENTS, msg) +} + +// ErrorfDepth logs to the STRUCTURED_EVENTS channel with severity ERROR, +// offsetting the caller's stack frame by 'depth'. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `ERROR` severity is used for situations that require special handling, +// where normal operation could not proceed as expected. +// Other operations can continue mostly unaffected. +func (loggerStructuredEvents) ErrorfDepth(ctx context.Context, depth int, format string, args ...interface{}) { + logfDepth(ctx, depth+1, severity.ERROR, channel.STRUCTURED_EVENTS, format, args...) +} + +// Fatalf logs to the STRUCTURED_EVENTS channel with severity FATAL. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `FATAL` severity is used for situations that require an immedate, hard +// server shutdown. A report is also sent to telemetry if telemetry +// is enabled. +func (loggerStructuredEvents) Fatalf(ctx context.Context, format string, args ...interface{}) { + logfDepth(ctx, 1, severity.FATAL, channel.STRUCTURED_EVENTS, format, args...) +} + +// VFatalf logs to the STRUCTURED_EVENTS channel with severity FATAL, +// if logging has been enabled for the source file where the call is +// performed at the provided verbosity level, via the vmodule setting. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `FATAL` severity is used for situations that require an immedate, hard +// server shutdown. A report is also sent to telemetry if telemetry +// is enabled. +func (loggerStructuredEvents) VFatalf(ctx context.Context, level Level, format string, args ...interface{}) { + if VDepth(level, 1) { + logfDepth(ctx, 1, severity.FATAL, channel.STRUCTURED_EVENTS, format, args...) + } +} + +// Fatal logs to the STRUCTURED_EVENTS channel with severity FATAL. +// It extracts log tags from the context and logs them along with the given +// message. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `FATAL` severity is used for situations that require an immedate, hard +// server shutdown. A report is also sent to telemetry if telemetry +// is enabled. +func (loggerStructuredEvents) Fatal(ctx context.Context, msg string) { + logfDepth(ctx, 1, severity.FATAL, channel.STRUCTURED_EVENTS, msg) +} + +// FatalfDepth logs to the STRUCTURED_EVENTS channel with severity FATAL, +// offsetting the caller's stack frame by 'depth'. +// It extracts log tags from the context and logs them along with the given +// message. Arguments are handled in the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +// +// The `FATAL` severity is used for situations that require an immedate, hard +// server shutdown. A report is also sent to telemetry if telemetry +// is enabled. +func (loggerStructuredEvents) FatalfDepth(ctx context.Context, depth int, format string, args ...interface{}) { + logfDepth(ctx, depth+1, severity.FATAL, channel.STRUCTURED_EVENTS, format, args...) +} + +// Shout logs to channel STRUCTURED_EVENTS, and also to the real stderr if logging +// is currently redirected to a file. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +func (loggerStructuredEvents) Shout(ctx context.Context, sev Severity, msg string) { + shoutfDepth(ctx, 1, sev, channel.STRUCTURED_EVENTS, msg) +} + +// Shoutf logs to channel STRUCTURED_EVENTS, and also to the real stderr if +// logging is currently redirected to a file. Arguments are handled in +// the manner of fmt.Printf. +// +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +func (loggerStructuredEvents) Shoutf(ctx context.Context, sev Severity, format string, args ...interface{}) { + shoutfDepth(ctx, 1, sev, channel.STRUCTURED_EVENTS, format, args...) +} + +// VEvent either logs a message to the channel (which also outputs to the +// active trace) or to the trace alone, depending on whether the specified +// verbosity level is active. +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +func (loggerStructuredEvents) VEvent(ctx context.Context, level Level, msg string) { + vEventf(ctx, false /* isErr */, 1, level, channel.STRUCTURED_EVENTS, msg) +} + +// VEventf either logs a message to the channel (which also outputs to the +// active trace) or to the trace alone, depending on whether the specified +// verbosity level is active. +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +func (loggerStructuredEvents) VEventf(ctx context.Context, level Level, format string, args ...interface{}) { + vEventf(ctx, false /* isErr */, 1, level, channel.STRUCTURED_EVENTS, format, args...) +} + +// VEventfDepth performs the same as VEventf but checks the verbosity level +// at the given depth in the call stack. +// The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured +// JSON, which can be consumed externally to power o11y features. +func (loggerStructuredEvents) VEventfDepth(ctx context.Context, depth int, level Level, format string, args ...interface{}) { + vEventf(ctx, false /* isErr */, 1+depth, level, channel.STRUCTURED_EVENTS, format, args...) +} diff --git a/pkg/util/log/logconfig/testdata/export b/pkg/util/log/logconfig/testdata/export index fffed171000f..37e1da5a59a4 100644 --- a/pkg/util/log/logconfig/testdata/export +++ b/pkg/util/log/logconfig/testdata/export @@ -18,6 +18,7 @@ component sources { () SQL_INTERNAL_PERF () TELEMETRY () KV_DISTRIBUTION +() STRUCTURED_EVENTS cloud stray as "stray\nerrors" } queue stderr @@ -43,11 +44,12 @@ SQL_PERF --> p__1 SQL_INTERNAL_PERF --> p__1 TELEMETRY --> p__1 KV_DISTRIBUTION --> p__1 +STRUCTURED_EVENTS --> p__1 p__1 --> buffer2 buffer2 --> f1 stray --> stderrfile @enduml -# http://www.plantuml.com/plantuml/uml/N9DFZzCm4CNl_XHMJd08QFV80MreXwr5cvGu6u6457dzPpSYZSl4GKAetntPRkk6azzxypgo_6iU9YVGpOU13DeuwYp5_kNLKTMZbgwt8v7sSBIZ7XsTx8nIJ_GlUVEMhg7rHxNd_jX0iMqsNl6cgej7218uPrKP2lphjkEh3UmAxvuur5snth7IkttDMhQ5HuXDA3bhM0jTiLe1l_uQli7gelTGVxveLZPGbqKA6zZ23fhwkpTVscxDU5EpJqyDgqeY1pihEZaKVwYOQ1RKZr4ZMfmoSYA_PZrhEZcb4OaKgEZpR8p6-r2FEek3OzVTXTHOF0ZtGQ9wpd_VPqIWwusGZff-2Ez5ZHsKHfgzLzg8UN2vwZ7pWr2X9kwHLlv4A-Jhky6-P9TXl4kGYr8U1-VkYPp86bgQvn_3nKYrvyb4FicVATq4PrOtoPLOoXAtb2LwjzkM39TRFSbbuj4jamNLDBgoJT5_XDCWuF7k391SGFhCt97u9_0clJ1vqAFoNyE_0000__y0 +# http://www.plantuml.com/plantuml/uml/N9F1Zfiy58NtVWgBrVylQ3MpxA8Q2hSJgmcas46jL0annfv19NXwCPMgAkzUsKxYC2kVS-x1ClxWOROTskKuab5fIwsXE3o_MDeFgAGTp4IaERwQIKsMpcP1gMRwb_ptFosWSKkrvsxPGBOL6wUugEhi4Rm4pbbL-WB_kcrvle5Tvjo1GzrcnOwLpkrhrh0jF49eGicPO0sqMPu3lpuDto2_w3tKdo-QbGBgCekXW2tiGDJVdVdIj0NZecQV3e9LPJXXVSZ5eOQYXGPAmOaSpTBJsMBtXtOpJRpwCIb4WtD2JkJNeXP5PzihH28xxEdJehN2UryFEWc3rxQzywasUEpi1udzK_hxFY4Tsa5tqb8zZFuIgJPZhv0cxtkbksMqQJzWuWQ-GdNOHnhv4qqdNzwDvZcv3CDPl5oLqZ0uTq_aH0feQ9f-z0SZrPv74w15VqPt4vm1tYHNZ36BC6CMaTxkjWQxtjNXNIUEvpfPeOwZA_0OlS5--uvluCUH9-VScIsvC7QPlYFX-t0cNZvvK5Fl_fv_0G00__y0 # Capture everything to one file with sync and warnings only to stderr. yaml only-channels=DEV,SESSIONS diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 088df190120a..9bff255fd65c 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -432,7 +432,7 @@ sinks: channels: {INFO: [STORAGE]} filter: INFO default: - channels: {INFO: [DEV, OPS, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION]} + channels: {INFO: [DEV, OPS, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS]} filter: INFO stderr: filter: NONE @@ -459,7 +459,7 @@ sinks: channels: {INFO: [HEALTH]} filter: INFO default: - channels: {INFO: [DEV, OPS, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION]} + channels: {INFO: [DEV, OPS, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS]} filter: INFO stderr: filter: NONE @@ -480,7 +480,7 @@ sinks: sinks: file-groups: custom: - channels: {WARNING: [DEV], ERROR: [OPS, HEALTH, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION]} + channels: {WARNING: [DEV], ERROR: [OPS, HEALTH, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS]} filter: ERROR stderr: filter: NONE @@ -533,7 +533,7 @@ sinks: sinks: file-groups: custom1: - channels: {ERROR: [DEV, OPS, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION]} + channels: {ERROR: [DEV, OPS, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS]} filter: ERROR custom2: channels: {WARNING: [DEV]} @@ -566,7 +566,7 @@ sinks: channels: {INFO: [STORAGE]} filter: INFO default: - channels: {WARNING: [HEALTH], ERROR: [DEV, OPS, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION]} + channels: {WARNING: [HEALTH], ERROR: [DEV, OPS, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS]} filter: ERROR stderr: filter: NONE diff --git a/pkg/util/log/logconfig/testdata/yaml b/pkg/util/log/logconfig/testdata/yaml index 6928c4e629a1..a9737f7d74a1 100644 --- a/pkg/util/log/logconfig/testdata/yaml +++ b/pkg/util/log/logconfig/testdata/yaml @@ -111,14 +111,14 @@ sinks: { stderr: { channels: 'all except DEV ,sessions' } } ---- sinks: stderr: - channels: [OPS, HEALTH, STORAGE, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION] + channels: [OPS, HEALTH, STORAGE, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS] yaml sinks: { stderr: { channels: 'all except [DEV, sessions]' } } ---- sinks: stderr: - channels: [OPS, HEALTH, STORAGE, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION] + channels: [OPS, HEALTH, STORAGE, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS] # Verify that channels can be filtered separately. yaml diff --git a/pkg/util/log/logpb/log.proto b/pkg/util/log/logpb/log.proto index eb24341d7804..9d540351bcdc 100644 --- a/pkg/util/log/logpb/log.proto +++ b/pkg/util/log/logpb/log.proto @@ -192,9 +192,13 @@ enum Channel { // ranges. KV_DISTRIBUTION = 13; + // STRUCTURED_EVENTS is used to send log events with messages containing structured + // JSON, which can be consumed externally to power o11y features. + STRUCTURED_EVENTS = 14; + // CHANNEL_MAX is the maximum allocated channel number so far. // This should be increased every time a new channel is added. - CHANNEL_MAX = 14; + CHANNEL_MAX = 15; } // Entry represents a cockroach log entry in the following two cases: diff --git a/pkg/util/log/structured_v2.go b/pkg/util/log/structured_v2.go new file mode 100644 index 000000000000..36208ad3361b --- /dev/null +++ b/pkg/util/log/structured_v2.go @@ -0,0 +1,81 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/log/severity" + "github.com/cockroachdb/redact" +) + +// EventType represents the type of an event emitted via Structured. +type EventType string + +const ( + STATEMENT_STATS EventType = "stmt_stats" +) + +var ( + trimPrefix = []byte("{") + trimSuffix = []byte("}") +) + +// StructuredMeta is a metadata object that accompanies each event emitted via Structured. +type StructuredMeta struct { + EventType EventType `json:"event_type"` +} + +// StructuredPayload is a wrapper around StructuredMeta and an arbitrary event. +// It's used to easily combine both into a single JSON object at Marshal time. +type StructuredPayload struct { + Metadata StructuredMeta `json:"metadata"` + Payload any `json:"payload"` +} + +// Structured emits a structured JSON payload to the STRUCTURED_EVENTS channel. +// TODO(abarganier): Redaction is not considered here yet. Enable redaction via struct tags. +// TODO(abarganier): StructuredEvent() is a similar API. We should consider how to reconcile or perhaps +// combine the two. +func Structured(ctx context.Context, meta StructuredMeta, payload any) { + if meta.EventType == "" { + Warningf(ctx, "event type not provided in structured event meta: %v", meta) + return + } + m := StructuredPayload{ + Metadata: meta, + Payload: payload, + } + // TODO(abarganier): Implement redaction in the JSON serialization step. Ideally, done using struct tags + // and code generation, similar to what we do for TELEMETRY events. + payloadBytes, err := json.Marshal(m) + if err != nil { + Warningf(ctx, "failed to marshal structured event to JSON with meta: %v", meta) + return + } + // TODO(abarganier): the log formatting code today already wraps JSON payloads in `{...}`, since originally, + // the code generation used by the TELEMETRY channel to serialize events to JSON omitted the curly-braces + // from the payload. We will eventually do the same when we get code gen working for our own structured events, + // but for now, this is a hack to prevent needless nesting of our payload. + payloadBytes = bytes.TrimPrefix(payloadBytes, trimPrefix) + payloadBytes = bytes.TrimSuffix(payloadBytes, trimSuffix) + + entry := makeEntry(ctx, severity.INFO, logpb.Channel_STRUCTURED_EVENTS, 0 /*depth*/) + entry.structured = true + // TODO(abarganier): Once redaction is in place, we shouldn't need to cast to RedactableString here. + entry.payload = makeRedactablePayload(ctx, redact.RedactableString(payloadBytes)) + + logger := logging.getLogger(entry.ch) + logger.outputLogEntry(entry) +} diff --git a/pkg/util/log/testdata/config b/pkg/util/log/testdata/config index 6ab851c6d53f..166904220ab2 100644 --- a/pkg/util/log/testdata/config +++ b/pkg/util/log/testdata/config @@ -41,7 +41,7 @@ sinks: redactable: true exit-on-error: true stderr: - channels: {INFO: [DEV], WARNING: [OPS, HEALTH, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION]} + channels: {INFO: [DEV], WARNING: [OPS, HEALTH, STORAGE, SESSIONS, SQL_SCHEMA, USER_ADMIN, PRIVILEGES, SENSITIVE_ACCESS, SQL_EXEC, SQL_PERF, SQL_INTERNAL_PERF, TELEMETRY, KV_DISTRIBUTION, STRUCTURED_EVENTS]} format: crdb-v2-tty redact: false redactable: true