Skip to content

Commit

Permalink
Merge #124058
Browse files Browse the repository at this point in the history
124058: log,logstream: structured log emission & consumption mechanism r=abarganier a=abarganier

**Note: please only consider the final two commits. The first two commits are being reviewed separately in #119416

---

This PR introduces `log.Structured` to the log package API. It aims to
serve as a prototype for the log facility we will use for exporting
"exhaust" from CRDB in the form of JSON objects. The intention is that
this exhaust can be consumed externally and be sufficient enough to
build features around.

This iteration has some serious limitations, the main one being that it
is not redaction-aware. The `log.StructuredEvent` exists alongside it
for now. Both implementations are quite similar, so they should probably
be reconciled and/or combined, but this is left as a TODO to avoid
slowing down the prototyping process. For now, it's sufficient for
prototyping.

The patch also introduces a new logging channel explicitly for the new
`log.Structured` API, called `STRUCTURED_EVENTS`. The ability to segment these
types of logs from the rest of the logs is what motivates this separate
channel. The name works for now, but we should consider if there's a
better name available.

The PR also expands upon the new structured logging facilities, adding a
mechanism to consume emitted structured logs internally.

This is done primarily via the new pkg/obs/logstream package, which
handles buffering, routing, & processing of events logged via
log.Structured.

It can be used in conjunction with the eventagg package, and the
KVProcessor interface, to provide users of the eventagg package a way
to consume streams of events flushed from their aggregations. This
enables engineers to use the aggregated data flushed from their
aggregations to build features internal to CRDB. Features that are
powered by the same data that could be consumed externally via the
STRUCTURED_EVENTS log channel.

The provided log config can be updated to make use of this new channel.
For example:
```
sinks:
  file-groups:
    structured-events:
      channels: [STRUCTURED_EVENTS]
```

The changes aim to complete the eventagg pipeline/ecosystem, which now
allows engineers to use common facilities to define aggregations, log
the aggregated results, and consume the logged events internally as
input data.

Finally, it completes the toy StmtStats example by defining a processor
for the aggregated events that are logged.

The below diagram outlines the end-to-end architecture of the system:

![Screenshot 2024-04-26 at 3 59 21 PM](https://github.com/cockroachdb/cockroach/assets/8194877/497c87d8-b7e7-440e-a69d-505a2e760be7)

Release note: none


Epic: CRDB-35919

Co-authored-by: Alex Barganier <abarganier@cockroachlabs.com>
  • Loading branch information
craig[bot] and abarganier committed Jun 5, 2024
2 parents 8c92ff1 + 39ba92b commit f699bcb
Show file tree
Hide file tree
Showing 31 changed files with 1,839 additions and 80 deletions.
5 changes: 5 additions & 0 deletions docs/generated/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,8 @@ The `KV_DISTRIBUTION` channel is used to report data distribution events, such a
replicas between stores in the cluster, or adding (removing) replicas to
ranges.

### `STRUCTURED_EVENTS`

The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured
JSON, which can be consumed externally to power o11y features.

3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ ALL_TESTS = [
"//pkg/multitenant/tenantcapabilities:tenantcapabilities_test",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream_test",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations_test",
"//pkg/obsservice/obslib/process:process_test",
Expand Down Expand Up @@ -1520,6 +1521,8 @@ GO_TARGETS = [
"//pkg/multitenant:multitenant",
"//pkg/obs/eventagg:eventagg",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream",
"//pkg/obs/logstream:logstream_test",
"//pkg/obs:obs",
"//pkg/obsservice/cmd/obsservice:obsservice",
"//pkg/obsservice/cmd/obsservice:obsservice_lib",
Expand Down
39 changes: 26 additions & 13 deletions pkg/cli/testdata/logflags
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -64,7 +65,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -165,7 +167,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/pathA/logs,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/pathA/logs,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/pathA/logs,true,crdb-v2)>,
Expand Down Expand Up @@ -203,7 +206,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/mypath,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/mypath,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/mypath,true,crdb-v2)>,
Expand Down Expand Up @@ -242,7 +246,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/pathA/logs,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/pathA/logs,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/pathA/logs,true,crdb-v2)>,
Expand Down Expand Up @@ -286,7 +291,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/mypath,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/mypath,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/mypath,true,crdb-v2)>,
Expand Down Expand Up @@ -323,7 +329,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -361,7 +368,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -409,7 +417,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -479,7 +488,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/mypath,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/mypath,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/mypath,true,crdb-v2)>,
Expand Down Expand Up @@ -518,7 +528,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/pathA,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/pathA,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/pathA,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/pathA,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/pathA,true,crdb-v2)>,
Expand Down Expand Up @@ -576,7 +587,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -613,7 +625,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/gomock.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ GOMOCK_SRCS = [
"//pkg/kv/kvclient/rangecache/rangecachemock:mocks_generated.go",
"//pkg/kv/kvclient/rangefeed:mocks_generated_test.go",
"//pkg/kv/kvpb/kvpbmock:mocks_generated.go",
"//pkg/obs/logstream:mocks_generated_test.go",
"//pkg/rpc:mocks_generated_test.go",
"//pkg/security/certmgr:mocks_generated_test.go",
"//pkg/sql/schemachanger/scexec:mocks_generated_test.go",
Expand Down
5 changes: 5 additions & 0 deletions pkg/obs/eventagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,24 @@ go_library(
go_test(
name = "eventagg_test",
srcs = [
"flush_consumer_test.go",
"flush_test.go",
"map_reduce_test.go",
],
data = glob(["testdata/**"]),
embed = [":eventagg"],
deps = [
"//pkg/obs/logstream",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logpb",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
32 changes: 28 additions & 4 deletions pkg/obs/eventagg/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,33 @@ package eventagg

import "time"

// FlushKind is the type of FlushTrigger used, which communicates aggregation strategy.
type FlushKind string

// Windowed is a FlushTrigger type that enforces windowed flush periods, containing a
// window start and end time.
const Windowed FlushKind = "WINDOWED"

// AggInfo is a type used to communicate metadata from a FlushTrigger to the
// targets of the flush about the aggregation type period. For example, the type
// of flush, and associated timestamps.
type AggInfo struct {
// Kind is the FlushKind used in the FlushTrigger.
Kind FlushKind `json:"kind"`
// StartTime is the primary timestamp for the flushed data.
StartTime int64 `json:"start_time"`
// EndTime, if present, represents the end timestamp of the flush interval.
EndTime int64 `json:"end_time"`
}

// FlushTrigger defines the interface used by aggregators, such as MapReduceAggregator,
// to determine when a flush of the current aggregation should be triggered.
//
// FlushTriggers are generally invoked as soon as events are consumed by an aggregator,
// but *before* the event itself is aggregated.
type FlushTrigger interface {
// shouldFlush returns true if this FlushTrigger has been tripped.
shouldFlush() bool
shouldFlush() (bool, AggInfo)
}

// WindowedFlush is a FlushTrigger which triggers flushes on a wall clock aligned
Expand Down Expand Up @@ -55,11 +74,16 @@ func (w *WindowedFlush) newWindowEnd() time.Time {
return w.nowFn().Truncate(w.window).Add(w.window)
}

func (w *WindowedFlush) shouldFlush() bool {
func (w *WindowedFlush) shouldFlush() (bool, AggInfo) {
t := w.nowFn()
if t.Equal(w.curWindowEnd) || t.After(w.curWindowEnd) {
meta := AggInfo{
Kind: Windowed,
StartTime: w.curWindowEnd.Add(-w.window).UnixNano(),
EndTime: w.curWindowEnd.UnixNano(),
}
w.curWindowEnd = w.newWindowEnd()
return true
return true, meta
}
return false
return false, AggInfo{}
}
74 changes: 36 additions & 38 deletions pkg/obs/eventagg/flush_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,53 @@

package eventagg

import "context"
import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
)

// flushConsumer is the interface type used to define a post-processing consumer of
// the aggregations flushed by a MapReduceAggregator.
type flushConsumer[K comparable, V any] interface {
// TODO(abarganier): It'd be better if we didn't pass down the reduceContainer and instead
// provided the unwrapped values, but for now I'd rather copying over to a new map.
onFlush(ctx context.Context, m map[K]V)
onFlush(ctx context.Context, aggInfo AggInfo, m map[K]V)
}

// LogWriteConsumer is an example flushConsumer, which provides an easy plug-and-play
// method of logging all the values Agg flushed by a MapReduceAggregator[Agg].
type LogWriteConsumer[K comparable, V any] struct{}
// method of logging all the values V flushed by a MapReduceAggregator[V].
type LogWriteConsumer[K comparable, V any] struct {
EventMeta log.StructuredLogMeta
}

var _ flushConsumer[int, any] = &LogWriteConsumer[int, any]{}

// TODO(abarganier): remove once used in future patch.
var _ = LogWriteConsumer[int, any].onFlush
// KeyValueLog is the type logged to log.Structured via the LogWriteConsumer.
// It wraps the flush metadata, key, and value flushed from the associated
// aggregator.
type KeyValueLog[K any, V any] struct {
AggInfo AggInfo `json:"agg_info"`
Key K `json:"key"`
Value V `json:"value"`
}

// NewLogWriteConsumer returns a new *LogWriteConsumer[Agg] instance.
func NewLogWriteConsumer[K comparable, V any]() *LogWriteConsumer[K, V] {
return &LogWriteConsumer[K, V]{}
// NewLogWriteConsumer returns a new *LogWriteConsumer[K, V] instance. All consumed events are
// expected to be of the type represented by the provided log.StructuredLogMeta.
func NewLogWriteConsumer[K comparable, V any](
eventMeta log.StructuredLogMeta,
) *LogWriteConsumer[K, V] {
return &LogWriteConsumer[K, V]{
EventMeta: eventMeta,
}
}

// TODO(abarganier): The values flushed out of a MapReduceAggregator ideally should be able
// to be provided to log.StructuredEvent(), or a similar facility, without transformation.
//
// For now, we punt this problem until we have the lower level interfaces fleshed out. Currently,
// log.StructuredEvent() requires a protobuf format, which may be cumbersome & something we'd like
// to avoid if possible within this system.
//
// One option would be to expand the API of pkg/util/log to enable direct JSON logging, e.g.:
//
// log.Structured(ctx, eventMetadata, eventJSON)
//
// This would allow us to construct a single metadata object that can be applied to every flushed
// value Agg.
func (l LogWriteConsumer[K, V]) onFlush(ctx context.Context, m map[K]V) {
//metadata := &logmeta.EventMetadata{
// EventType: ...,
// EventTimestamp: timeutil.Now().UnixNano(),
// NodeDetails: logmeta.GetNodeDetails(ctx),
// ...,
//}
//for _, v := range m {
// marshalled, err := json.Marshal(v)
// if err != nil {
// log.Errorf(ctx, "failed to marshal JSON for event: %v", v)
// }
// log.Structured(ctx, metadata, marshalled)
//}
// onFlush emits all events in the provided map as logs via log.Structured
func (l *LogWriteConsumer[K, V]) onFlush(ctx context.Context, aggInfo AggInfo, m map[K]V) {
for k, v := range m {
KVLog := KeyValueLog[K, V]{
AggInfo: aggInfo,
Key: k,
Value: v,
}
log.Structured(ctx, l.EventMeta, KVLog)
}
}
Loading

0 comments on commit f699bcb

Please sign in to comment.