Skip to content

Commit

Permalink
eventagg: introduce mechanism to consume structured logs internally
Browse files Browse the repository at this point in the history
This patch expands upon the new structured logging facilities, adding a
mechanism to consume emitted structured logs internally.

This is done primarily via the new pkg/obs/logstream package, which
handles buffering, routing, & processing of events logged via
log.Structured.

It can be used in conjunction with the eventagg package, and the
KVProcessor interface, to provide users of the eventagg package a way
to consume streams of events flushed from their aggregations. This
enables engineers to use the aggregated data flushed from their
aggregations to build features internal to CRDB. Features that are
powered by the same data that could be consumed externally via the
STRUCTURED_EVENTS log channel.

The provided log config can be updated to make use of this new channel.
For example:
```
sinks:
  file-groups:
    structured-events:
      channels: [STRUCTURED_EVENTS]
```

The changes aim to complete the eventagg pipeline/ecosystem, which now
allows engineers to use common facilities to define aggregations, log
the aggregated results, and consume the logged events internally as
input data.

Finally, it completes the toy StmtStats example by defining a processor
for the aggregated events that are logged.

Release note: none
  • Loading branch information
abarganier committed May 6, 2024
1 parent 06d762f commit 96a1e2e
Show file tree
Hide file tree
Showing 21 changed files with 1,272 additions and 9 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ ALL_TESTS = [
"//pkg/multitenant/tenantcapabilities:tenantcapabilities_test",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream_test",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations_test",
"//pkg/obsservice/obslib/process:process_test",
Expand Down Expand Up @@ -1508,6 +1509,8 @@ GO_TARGETS = [
"//pkg/multitenant:multitenant",
"//pkg/obs/eventagg:eventagg",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream",
"//pkg/obs/logstream:logstream_test",
"//pkg/obs:obs",
"//pkg/obsservice/cmd/obsservice:obsservice",
"//pkg/obsservice/cmd/obsservice:obsservice_lib",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/gomock.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ GOMOCK_SRCS = [
"//pkg/kv/kvclient/rangecache/rangecachemock:mocks_generated.go",
"//pkg/kv/kvclient/rangefeed:mocks_generated_test.go",
"//pkg/kv/kvpb/kvpbmock:mocks_generated.go",
"//pkg/obs/logstream:mocks_generated_test.go",
"//pkg/rpc:mocks_generated_test.go",
"//pkg/security/certmgr:mocks_generated_test.go",
"//pkg/sql/schemachanger/scexec:mocks_generated_test.go",
Expand Down
3 changes: 3 additions & 0 deletions pkg/obs/eventagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ go_library(
"event_agg.go",
"flush.go",
"flush_consumer.go",
"kv_processor.go",
"map_reduce.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obs/eventagg",
visibility = ["//visibility:public"],
deps = [
"//pkg/obs/logstream",
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/obs/eventagg/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ package eventagg
// aggregation will operate on, and an interface that effectively allows us to perform Map/Reduce-like
// operations on those types.
//
// ## Map/Reduce Consumer Plugins
// ## Map/Reduce EmittedKVProcessor Plugins
// Users of the eventagg package should have a library of Map/Reduce output consumers available to make
// things easy to use. For example, after aggregating some data, I want to calculate the TopK elements
// based on some field. That should be as easy for engineers as instantiating a plugin and defining the
Expand Down
67 changes: 67 additions & 0 deletions pkg/obs/eventagg/kv_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import (
"context"

"github.com/cockroachdb/cockroach/pkg/obs/logstream"
"github.com/cockroachdb/errors"
)

// KVProcessor is the interface used to process emitted KeyValueLog events
// logged via log.Structured (generally, logged by something like a LogWriteConsumer).
// - K is the type of the key.
// - V is the type of the value.
type KVProcessor[K any, V any] interface {
// Process is called once per key-value pair, and also includes metadata
// about the flush process used during the aggregation step.
Process(ctx context.Context, aggInfo AggInfo, k K, v V) error
}

// EmittedKVProcessor is a logstream.Processor implementation that unpacks KeyValueLog
// events logged via log.Structured (generally, logged by something like a LogWriteConsumer
// during a MapReduceAggregator flush) before delegating them onto a KVProcessor. This
// saves users of pkg/util/eventagg the hassle of doing type assertions of their own, and
// provides a generics-friendly way to process emitted KeyValueLog events.
//
// - K is the type of the key.
// - V is the type of the value.
type EmittedKVProcessor[K any, V any] struct {
processor KVProcessor[K, V]
}

// NewEmittedKVProcessor returns a new EmittedKVProcessor, which invokes the provided
// KVProcessor for each event processed.
func NewEmittedKVProcessor[K any, V any](processor KVProcessor[K, V]) *EmittedKVProcessor[K, V] {
return &EmittedKVProcessor[K, V]{
processor: processor,
}
}

var _ logstream.Processor = (*EmittedKVProcessor[any, any])(nil)

// Process implements the logstream.Processor interface.
func (e *EmittedKVProcessor[K, V]) Process(ctx context.Context, event any) error {
kvLog, ok := event.(KeyValueLog)
if !ok {
return errors.Newf("unexpected type for event: %v", event)
}
key, ok := kvLog.Key.(K)
if !ok {
return errors.Newf("unexpected key type for event: %v", event)
}
value, ok := kvLog.Value.(V)
if !ok {
return errors.Newf("unexpected value type for event: %v", event)
}
return e.processor.Process(ctx, kvLog.AggInfo, key, value)
}
59 changes: 59 additions & 0 deletions pkg/obs/logstream/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
load("@bazel_gomock//:gomock.bzl", "gomock")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "logstream",
srcs = [
"processor.go",
"processor_buffer.go",
"registration.go",
"router.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obs/logstream",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "logstream_test",
srcs = [
"processor_buffer_test.go",
"registration_test.go",
"router_test.go",
":mock_processor", # keep
],
embed = [":logstream"],
deps = [
"//pkg/roachpb",
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_golang_mock//gomock",
"@com_github_stretchr_testify//require",
],
)

gomock(
name = "mock_processor",
out = "mocks_generated_test.go",
interfaces = [
"Processor",
],
library = ":logstream",
mock_names = {"TestingProcessor": "MockProcessor"},
package = "logstream",
self_package = "github.com/cockroachdb/cockroach/pkg/util/logstream",
visibility = [
":__pkg__",
"//pkg/gen:__pkg__",
],
)
49 changes: 49 additions & 0 deletions pkg/obs/logstream/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions pkg/obs/logstream/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package logstream

import "context"

// Processor defines how a component can process streams of structured log events as they're
// passed to log.Structured. Processor enables engineers to use streams of log events as the
// primary input to a larger system, usually focused on observability.
//
// See RegisterProcessor for more details on how to create & register a processor with this package.
type Processor interface {
// Process processes a single log stream event.
Process(context.Context, any) error
}
Loading

0 comments on commit 96a1e2e

Please sign in to comment.