Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/util/eventagg: general aggregation framework for reduction of event cardinality #119416

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ ALL_TESTS = [
"//pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher:tenantcapabilitieswatcher_test",
"//pkg/multitenant/tenantcapabilities:tenantcapabilities_test",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations_test",
"//pkg/obsservice/obslib/process:process_test",
Expand Down Expand Up @@ -1512,6 +1513,8 @@ GO_TARGETS = [
"//pkg/multitenant/tenantcostmodel:tenantcostmodel",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/multitenant:multitenant",
"//pkg/obs/eventagg:eventagg",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs:obs",
"//pkg/obsservice/cmd/obsservice:obsservice",
"//pkg/obsservice/cmd/obsservice:obsservice_lib",
Expand Down
38 changes: 38 additions & 0 deletions pkg/obs/eventagg/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "eventagg",
srcs = [
"flush.go",
"flush_consumer.go",
"map_reduce.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/obs/eventagg",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
],
)

go_test(
name = "eventagg_test",
srcs = [
"flush_test.go",
"map_reduce_test.go",
],
data = glob(["testdata/**"]),
embed = [":eventagg"],
deps = [
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)
65 changes: 65 additions & 0 deletions pkg/obs/eventagg/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import "time"

// FlushTrigger defines the interface used by aggregators, such as MapReduceAggregator,
// to determine when a flush of the current aggregation should be triggered.
//
// FlushTriggers are generally invoked as soon as events are consumed by an aggregator,
// but *before* the event itself is aggregated.
type FlushTrigger interface {
// shouldFlush returns true if this FlushTrigger has been tripped.
shouldFlush() bool
}

// WindowedFlush is a FlushTrigger which triggers flushes on a wall clock aligned
// time window.
//
// On initialization, a window duration is provided. Each time window will be a time
// truncated to that time window.
//
// For example, if we provide a window of 5 minutes, the windows will be:
// - [12:00:00, 12:05:00)
// - [12:05:00, 12:10:00)
// - [12:10:00, 12:15:00)
// - etc.
type WindowedFlush struct {
window time.Duration
curWindowEnd time.Time
nowFn func() time.Time
}

var _ FlushTrigger = (*WindowedFlush)(nil)

// NewWindowedFlush returns a new WindowedFlush for the provided window.
func NewWindowedFlush(window time.Duration, nowFn func() time.Time) *WindowedFlush {
w := &WindowedFlush{
window: window,
nowFn: nowFn,
}
w.curWindowEnd = w.newWindowEnd()
return w
}

func (w *WindowedFlush) newWindowEnd() time.Time {
return w.nowFn().Truncate(w.window).Add(w.window)
}

func (w *WindowedFlush) shouldFlush() bool {
t := w.nowFn()
if t.Equal(w.curWindowEnd) || t.After(w.curWindowEnd) {
w.curWindowEnd = w.newWindowEnd()
return true
}
return false
}
64 changes: 64 additions & 0 deletions pkg/obs/eventagg/flush_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import "context"

// flushConsumer is the interface type used to define a post-processing consumer of
// the aggregations flushed by a MapReduceAggregator.
type flushConsumer[K comparable, V any] interface {
// TODO(abarganier): It'd be better if we didn't pass down the reduceContainer and instead
// provided the unwrapped values, but for now I'd rather copying over to a new map.
onFlush(ctx context.Context, m map[K]V)
}

// LogWriteConsumer is an example flushConsumer, which provides an easy plug-and-play
// method of logging all the values Agg flushed by a MapReduceAggregator[Agg].
type LogWriteConsumer[K comparable, V any] struct{}

var _ flushConsumer[int, any] = &LogWriteConsumer[int, any]{}

// TODO(abarganier): remove once used in future patch.
var _ = LogWriteConsumer[int, any].onFlush

// NewLogWriteConsumer returns a new *LogWriteConsumer[Agg] instance.
func NewLogWriteConsumer[K comparable, V any]() *LogWriteConsumer[K, V] {
return &LogWriteConsumer[K, V]{}
}

// TODO(abarganier): The values flushed out of a MapReduceAggregator ideally should be able
// to be provided to log.StructuredEvent(), or a similar facility, without transformation.
//
// For now, we punt this problem until we have the lower level interfaces fleshed out. Currently,
// log.StructuredEvent() requires a protobuf format, which may be cumbersome & something we'd like
// to avoid if possible within this system.
//
// One option would be to expand the API of pkg/util/log to enable direct JSON logging, e.g.:
//
// log.Structured(ctx, eventMetadata, eventJSON)
//
// This would allow us to construct a single metadata object that can be applied to every flushed
// value Agg.
func (l LogWriteConsumer[K, V]) onFlush(ctx context.Context, m map[K]V) {
//metadata := &logmeta.EventMetadata{
// EventType: ...,
// EventTimestamp: timeutil.Now().UnixNano(),
// NodeDetails: logmeta.GetNodeDetails(ctx),
// ...,
//}
//for _, v := range m {
// marshalled, err := json.Marshal(v)
// if err != nil {
// log.Errorf(ctx, "failed to marshal JSON for event: %v", v)
// }
// log.Structured(ctx, metadata, marshalled)
//}
}
40 changes: 40 additions & 0 deletions pkg/obs/eventagg/flush_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestWindowedFlush(t *testing.T) {
window := 5 * time.Minute
now := timeutil.Now()
// Make sure we're not directly on the window boundary.
if now.Truncate(window).Equal(now) {
now = now.Add(1 * time.Minute)
}
flush := NewWindowedFlush(window, func() time.Time {
return now
})
// Initially, we should be within the current window.
require.False(t, flush.shouldFlush())
// If we fast-forward to the beginning of the next window, we expect
// a flush to be triggered.
now = now.Add(window).Truncate(window)
require.True(t, flush.shouldFlush())
// Times occurring before the current window's end should not trigger a flush.
now = now.Add(1 * time.Minute)
require.False(t, flush.shouldFlush())
}
119 changes: 119 additions & 0 deletions pkg/obs/eventagg/map_reduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// envEnableStructuredEvents determines whether the eventagg package is enabled. The features within
// this package are currently experimental, and must be explicitly enabled via this envvar.
var envEnableStructuredEvents = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_STRUCTURED_EVENTS", false)

// MapReduceAggregator performs aggregations on events of type E, keyed by type K,
// which are reduced into type Agg.
//
// Users define and provide the map and reduce functions at construction. MapReduceAggregator can
// be configured with one or more flushConsumers, which will be given a reference to the flushed
// data for further processing upon flush. flushConsumers are invoked asynchronously, putting the
// post-processing outside the critical path of calls to Add.
//
// - newFn defines how to instantiate a new Agg type, since Golang generics doesn't have
// great support for this based on the generic type alone.
// - keyFn defines how to derive a key from incoming events of type E. This key will be used
// to determine which bucket the event should be aggregated into, represented by type Agg.
// - mergeFn defines how to merge an incoming event of type E into a pre-existing aggregation
// for the same key (derived via keyFn).
type MapReduceAggregator[E any, K comparable, Agg any] struct {
stopper *stop.Stopper
newFn func() Agg
keyFn func(e E) K
mergeFn func(e E, agg Agg)
mu struct {
syncutil.Mutex
cache map[K]Agg
flushTrigger FlushTrigger
}
consumers []flushConsumer[K, Agg]
}

// NewMapReduceAggregator returns a new MapReduceAggregator[E, K, Agg].
func NewMapReduceAggregator[E any, K comparable, Agg any](
stopper *stop.Stopper,
newFn func() Agg,
keyFn func(e E) K,
mergeFn func(e E, agg Agg),
flushTrigger FlushTrigger,
flushConsumers ...flushConsumer[K, Agg],
) *MapReduceAggregator[E, K, Agg] {
m := &MapReduceAggregator[E, K, Agg]{
stopper: stopper,
newFn: newFn,
keyFn: keyFn,
mergeFn: mergeFn,
consumers: flushConsumers,
}
m.mu.cache = make(map[K]Agg)
m.mu.flushTrigger = flushTrigger
return m
}

// Add processes the provided event e, aggregating it based on the provided keyFn and mergeFn.
//
// Add may trigger an asynchronous flush of the current batch of aggregated data *prior to*
// consuming the given event, meaning the event will be aggregated into a fresh aggregation
// window. This flush criteria is determined by the FlushTrigger provided at construction.
func (m *MapReduceAggregator[E, K, Agg]) Add(ctx context.Context, e E) {
if !envEnableStructuredEvents {
return
}
k := m.keyFn(e)
m.mu.Lock()
defer m.mu.Unlock()
// If it's time to flush, do so async before processing the event.
// This will reset the cache, meaning our event will be added to
// a fresh aggregation window.
if m.mu.flushTrigger.shouldFlush() {
m.flushAsync(ctx, m.getAndResetCacheLocked())
}
v, ok := m.mu.cache[k]
if !ok {
v = m.newFn()
m.mu.cache[k] = v
}
m.mergeFn(e, v)
}

// flushAsync triggers an asynchronous flush of the provided aggregate data, which resets the
// underlying cache for a new aggregation window.
//
// The flushed data will be passed to each of the configured flushConsumer's
// provided at construction for further processing.
func (m *MapReduceAggregator[E, K, Agg]) flushAsync(ctx context.Context, flushed map[K]Agg) {
if err := m.stopper.RunAsyncTask(ctx, "map-reduce-flush", func(ctx context.Context) {
for _, c := range m.consumers {
c.onFlush(ctx, flushed)
}
}); err != nil {
log.Errorf(ctx, "a problem occurred attempting to flush an aggregation: %v", err)
}
}

func (m *MapReduceAggregator[E, K, V]) getAndResetCacheLocked() map[K]V {
flushed := m.mu.cache
m.mu.cache = make(map[K]V)
return flushed
}
Loading
Loading