Skip to content

Commit

Permalink
pkg/util/log: introduce log.Structured, wire up to aggregator
Browse files Browse the repository at this point in the history
This patch introduces log.Structured to the log package API. It aims to
serve as a prototype for the log facility we will use for exporting
"exhaust" from CRDB in the form of JSON objects. The intention is that
this exhaust can be consumed externally and be sufficient enough to
build features around.

This iteration has some serious limitations, the main one being that it
is not redaction-aware. The `log.StructuredEvent` exists alongside it
for now. Both implementations are quite similar, so they should probably
be reconciled and/or combined, but this is left as a TODO to avoid
slowing down the prototyping process. For now, it's sufficient for
prototyping.

The patch also introduces a new logging channel explicitly for the new
`log.Structured` API, called `STRUCTURED_EVENTS`. The ability to segment these
types of logs from the rest of the logs is what motivates this separate
channel. The name works for now, but we should consider if there's a
better name available.

A following patch will focus on internal consumption of these events.

Release note: none
  • Loading branch information
abarganier committed May 6, 2024
1 parent fdabacc commit 06d762f
Show file tree
Hide file tree
Showing 25 changed files with 705 additions and 88 deletions.
5 changes: 5 additions & 0 deletions docs/generated/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,8 @@ The `KV_DISTRIBUTION` channel is used to report data distribution events, such a
replicas between stores in the cluster, or adding (removing) replicas to
ranges.

### `STRUCTURED_EVENTS`

The `STRUCTURED_EVENTS` channel is used to send log events with messages containing structured
JSON, which can be consumed externally to power o11y features.

39 changes: 26 additions & 13 deletions pkg/cli/testdata/logflags
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -64,7 +65,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -165,7 +167,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/pathA/logs,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/pathA/logs,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/pathA/logs,true,crdb-v2)>,
Expand Down Expand Up @@ -203,7 +206,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/mypath,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/mypath,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/mypath,true,crdb-v2)>,
Expand Down Expand Up @@ -242,7 +246,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/pathA/logs,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/pathA/logs,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/pathA/logs,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/pathA/logs,true,crdb-v2)>,
Expand Down Expand Up @@ -286,7 +291,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/mypath,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/mypath,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/mypath,true,crdb-v2)>,
Expand Down Expand Up @@ -323,7 +329,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -361,7 +368,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -409,7 +417,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -479,7 +488,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/mypath,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/mypath,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/mypath,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/mypath,true,crdb-v2)>,
Expand Down Expand Up @@ -518,7 +528,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],/pathA,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],/pathA,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],/pathA,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],/pathA,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],/pathA,true,crdb-v2)>,
Expand Down Expand Up @@ -576,7 +587,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down Expand Up @@ -613,7 +625,8 @@ SQL_EXEC,
SQL_PERF,
SQL_INTERNAL_PERF,
TELEMETRY,
KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
KV_DISTRIBUTION,
STRUCTURED_EVENTS],<defaultLogDir>,true,crdb-v2)>,
health: <fileCfg(INFO: [HEALTH],<defaultLogDir>,true,crdb-v2)>,
kv-distribution: <fileCfg(INFO: [KV_DISTRIBUTION],<defaultLogDir>,true,crdb-v2)>,
pebble: <fileCfg(INFO: [STORAGE],<defaultLogDir>,true,crdb-v2)>,
Expand Down
5 changes: 5 additions & 0 deletions pkg/obs/eventagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,30 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/syncutil",
],
)

go_test(
name = "eventagg_test",
srcs = [
"flush_consumer_test.go",
"flush_test.go",
"map_reduce_test.go",
],
data = glob(["testdata/**"]),
embed = [":eventagg"],
deps = [
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logpb",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
10 changes: 2 additions & 8 deletions pkg/obs/eventagg/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,9 @@ package eventagg
// challenges that we're punting until later that are worth enumerating here.
//
// Unsolved Challenges:
// 1. What do the details of the structured logging API look like that the eventagg package will use?
// Current telemetry logging requires protobuf messages which can be unwieldy and require us to either
// use protobuf types within the eventagg system (undesirable) or transform aggregation results into
// a protobuf type (duplicative). Some early ideas on how to solve this are discussed in comments within
// pkg/obs/eventagg/flush_consumer.go, within (*LogWriteConsumer[Agg]).onFlush(). For now, we punt this
// problem until the broader eventagg interfaces are agreed upon.
// 2. Memory accounting as well as observability into the aggregations themselves are required to make
// 1. Memory accounting as well as observability into the aggregations themselves are required to make
// eventagg safe to use and easy to debug. For now, we punt this problem.
// 3. What kinds of concurrency, if any, should be used within the eventagg package? If the goal is for
// 2. What kinds of concurrency, if any, should be used within the eventagg package? If the goal is for
// eventagg to be easy for developers to use, then making a Map/Reduce operation concurrent is
// should also be easy if there's a need for it. We punt whether this is a requirement, and the details
// of the problem if it *is* a requirement, until later.
32 changes: 28 additions & 4 deletions pkg/obs/eventagg/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,33 @@ package eventagg

import "time"

// FlushKind is the type of FlushTrigger used, which communicates aggregation strategy.
type FlushKind string

// Windowed is a FlushTrigger type that enforces windowed flush periods, containing a
// window start and end time.
const Windowed FlushKind = "WINDOWED"

// AggInfo is a type used to communicate metadata from a FlushTrigger to the
// targets of the flush about the aggregation type period. For example, the type
// of flush, and associated timestamps.
type AggInfo struct {
// Kind is the FlushKind used in the FlushTrigger.
Kind FlushKind `json:"kind"`
// StartTime is the primary timestamp for the flushed data.
StartTime int64 `json:"start_time"`
// EndTime, if present, represents the end timestamp of the flush interval.
EndTime int64 `json:"end_time"`
}

// FlushTrigger defines the interface used by aggregators, such as MapReduceAggregator,
// to determine when a flush of the current aggregation should be triggered.
//
// FlushTriggers are generally invoked as soon as events are consumed by an aggregator,
// but *before* the event itself is aggregated.
type FlushTrigger interface {
// shouldFlush returns true if this FlushTrigger has been tripped.
shouldFlush() bool
shouldFlush() (bool, AggInfo)
}

// WindowedFlush is a FlushTrigger which triggers flushes on a wall clock aligned
Expand Down Expand Up @@ -55,11 +74,16 @@ func (w *WindowedFlush) newWindowEnd() time.Time {
return w.nowFn().Truncate(w.window).Add(w.window)
}

func (w *WindowedFlush) shouldFlush() bool {
func (w *WindowedFlush) shouldFlush() (bool, AggInfo) {
t := w.nowFn()
if t.Equal(w.curWindowEnd) || t.After(w.curWindowEnd) {
meta := AggInfo{
Kind: Windowed,
StartTime: w.curWindowEnd.Add(-w.window).UnixNano(),
EndTime: w.curWindowEnd.UnixNano(),
}
w.curWindowEnd = w.newWindowEnd()
return true
return true, meta
}
return false
return false, AggInfo{}
}
73 changes: 37 additions & 36 deletions pkg/obs/eventagg/flush_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,51 @@

package eventagg

import "context"
import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
)

// flushConsumer is the interface type used to define a post-processing consumer of
// the aggregations flushed by a MapReduceAggregator.
type flushConsumer[K comparable, V any] interface {
// TODO(abarganier): It'd be better if we didn't pass down the reduceContainer and instead
// provided the unwrapped values, but for now I'd rather copying over to a new map.
onFlush(ctx context.Context, m map[K]V)
onFlush(ctx context.Context, aggInfo AggInfo, m map[K]V)
}

// LogWriteConsumer is an example flushConsumer, which provides an easy plug-and-play
// method of logging all the values Agg flushed by a MapReduceAggregator[Agg].
type LogWriteConsumer[K comparable, V any] struct{}
// method of logging all the values V flushed by a MapReduceAggregator[V].
type LogWriteConsumer[K comparable, V any] struct {
EventType log.EventType
}

// NewLogWriteConsumer returns a new *LogWriteConsumer[Agg] instance.
func NewLogWriteConsumer[K comparable, V any]() *LogWriteConsumer[K, V] {
return &LogWriteConsumer[K, V]{}
// KeyValueLog is the type logged to log.Structured via the LogWriteConsumer.
// It wraps the flush metadata, key, and value flushed from the associated
// aggregator.
type KeyValueLog struct {
AggInfo AggInfo `json:"agg_info"`
Key any `json:"key"`
Value any `json:"value"`
}

// TODO(abarganier): The values flushed out of a MapReduceAggregator ideally should be able
// to be provided to log.StructuredEvent(), or a similar facility, without transformation.
//
// For now, we punt this problem until we have the lower level interfaces fleshed out. Currently,
// log.StructuredEvent() requires a protobuf format, which may be cumbersome & something we'd like
// to avoid if possible within this system.
//
// One option would be to expand the API of pkg/util/log to enable direct JSON logging, e.g.:
//
// log.Structured(ctx, eventMetadata, eventJSON)
//
// This would allow us to construct a single metadata object that can be applied to every flushed
// value Agg.
func (l LogWriteConsumer[K, V]) onFlush(ctx context.Context, m map[K]V) {
//metadata := &logmeta.EventMetadata{
// EventType: ...,
// EventTimestamp: timeutil.Now().UnixNano(),
// NodeDetails: logmeta.GetNodeDetails(ctx),
// ...,
//}
//for _, v := range m {
// marshalled, err := json.Marshal(v)
// if err != nil {
// log.Errorf(ctx, "failed to marshal JSON for event: %v", v)
// }
// log.Structured(ctx, metadata, marshalled)
//}
// NewLogWriteConsumer returns a new *LogWriteConsumer[K, V] instance.
func NewLogWriteConsumer[K comparable, V any](eventType log.EventType) *LogWriteConsumer[K, V] {
return &LogWriteConsumer[K, V]{
EventType: eventType,
}
}

// onFlush emits all events in the provided map as logs via log.Structured
func (l LogWriteConsumer[K, V]) onFlush(ctx context.Context, aggInfo AggInfo, m map[K]V) {
metadata := log.StructuredMeta{
EventType: l.EventType,
}
for k, v := range m {
KVLog := KeyValueLog{
AggInfo: aggInfo,
Key: k,
Value: v,
}
log.Structured(ctx, metadata, KVLog)
}
}
Loading

0 comments on commit 06d762f

Please sign in to comment.