Skip to content

Commit

Permalink
eventagg: add FlushTrigger mechanism to MapReduceAggregator
Browse files Browse the repository at this point in the history
This patch introduces the FlushTrigger interface, which can be used by
the MapReduceAggregator to determine when it's time to flush the current
aggregation.

Along with the interface, an initial implementation is provided called
`WindowedFlush`. `WindowedFlush` aligns event aggregations to truncated
time intervals given a user-provided time window.

For example, if a window of 5 minutes was given, the `WindowedFlush`
would enforce the following window boundaries:

- [12:00:00, 12:05:00)
- [12:05:00, 12:10:00)
- [12:10:00, 12:15:00)
- etc.

This is a first pass implementation of the flush mechanism used in
the eventagg package. As needs evolve, the interface and/or
implementation is subject to change. For the purposed of prototyping
though, this meets our needs.

Release note: none
  • Loading branch information
abarganier committed May 6, 2024
1 parent fe8d17d commit fdabacc
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 34 deletions.
8 changes: 7 additions & 1 deletion pkg/obs/eventagg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"doc.go",
"event_agg.go",
"flush.go",
"flush_consumer.go",
"map_reduce.go",
],
Expand All @@ -18,13 +19,18 @@ go_library(

go_test(
name = "eventagg_test",
srcs = ["map_reduce_test.go"],
srcs = [
"flush_test.go",
"map_reduce_test.go",
],
data = glob(["testdata/**"]),
embed = [":eventagg"],
deps = [
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
Expand Down
65 changes: 65 additions & 0 deletions pkg/obs/eventagg/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import "time"

// FlushTrigger defines the interface used by aggregators, such as MapReduceAggregator,
// to determine when a flush of the current aggregation should be triggered.
//
// FlushTriggers are generally invoked as soon as events are consumed by an aggregator,
// but *before* the event itself is aggregated.
type FlushTrigger interface {
// shouldFlush returns true if this FlushTrigger has been tripped.
shouldFlush() bool
}

// WindowedFlush is a FlushTrigger which triggers flushes on a wall clock aligned
// time window.
//
// On initialization, a window duration is provided. Each time window will be a time
// truncated to that time window.
//
// For example, if we provide a window of 5 minutes, the windows will be:
// - [12:00:00, 12:05:00)
// - [12:05:00, 12:10:00)
// - [12:10:00, 12:15:00)
// - etc.
type WindowedFlush struct {
window time.Duration
curWindowEnd time.Time
nowFn func() time.Time
}

var _ FlushTrigger = (*WindowedFlush)(nil)

// NewWindowedFlush returns a new WindowedFlush for the provided window.
func NewWindowedFlush(window time.Duration, nowFn func() time.Time) *WindowedFlush {
w := &WindowedFlush{
window: window,
nowFn: nowFn,
}
w.curWindowEnd = w.newWindowEnd()
return w
}

func (w *WindowedFlush) newWindowEnd() time.Time {
return w.nowFn().Truncate(w.window).Add(w.window)
}

func (w *WindowedFlush) shouldFlush() bool {
t := w.nowFn()
if t.Equal(w.curWindowEnd) || t.After(w.curWindowEnd) {
w.curWindowEnd = w.newWindowEnd()
return true
}
return false
}
40 changes: 40 additions & 0 deletions pkg/obs/eventagg/flush_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package eventagg

import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestWindowedFlush(t *testing.T) {
window := 5 * time.Minute
now := timeutil.Now()
// Make sure we're not directly on the window boundary.
if now.Truncate(window).Equal(now) {
now = now.Add(1 * time.Minute)
}
flush := NewWindowedFlush(window, func() time.Time {
return now
})
// Initially, we should be within the current window.
require.False(t, flush.shouldFlush())
// If we fast-forward to the beginning of the next window, we expect
// a flush to be triggered.
now = now.Add(window).Truncate(window)
require.True(t, flush.shouldFlush())
// Times occurring before the current window's end should not trigger a flush.
now = now.Add(1 * time.Minute)
require.False(t, flush.shouldFlush())
}
34 changes: 20 additions & 14 deletions pkg/obs/eventagg/map_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,39 @@ type MapReduceAggregator[E Mergeable[K, Agg], K comparable, Agg any] struct {
newFn func() Agg
mu struct {
syncutil.Mutex
cache map[K]Agg
cache map[K]Agg
flushTrigger FlushTrigger
}
consumers []flushConsumer[K, Agg]
}

// NewMapReduceAggregator returns a new MapReduceAggregator[Agg].
func NewMapReduceAggregator[E Mergeable[K, V], K comparable, V any](
newFn func() V, flushConsumers ...flushConsumer[K, V],
newFn func() V, flushTrigger FlushTrigger, flushConsumers ...flushConsumer[K, V],
) *MapReduceAggregator[E, K, V] {
m := &MapReduceAggregator[E, K, V]{
newFn: newFn,
consumers: flushConsumers,
}
m.mu.cache = make(map[K]V)
m.mu.flushTrigger = flushTrigger
return m
}

// Add implements the aggregator interface.
func (m *MapReduceAggregator[E, K, Agg]) Add(_ context.Context, e E) {
func (m *MapReduceAggregator[E, K, Agg]) Add(ctx context.Context, e E) {
if !envEnableStructuredEvents {
return
}
k := e.GroupingKey()
m.mu.Lock()
defer m.mu.Unlock()
// If it's time to flush, do so async before processing the event.
// This will reset the cache, meaning our event will be added to
// a fresh aggregation window.
if m.mu.flushTrigger.shouldFlush() {
m.flushAsync(ctx, m.getAndResetCacheLocked())
}
v, ok := m.mu.cache[k]
if !ok {
v = m.newFn()
Expand All @@ -62,20 +70,12 @@ func (m *MapReduceAggregator[E, K, Agg]) Add(_ context.Context, e E) {
e.MergeInto(v)
}

// Flush triggers a flush of the in-memory aggregate data, which resets the
// underlying cache for a new aggregation window.
// flushAsync spawns a new goroutine to asynchronously invoke each flushConsumer associated
// with this MapReduceAggregator.
//
// The flushed data will be passed to each of the configured flushConsumer's
// provided at construction for further processing.
// TODO(abarganier): implement more robust flush mechanism, with configurable triggers.
func (m *MapReduceAggregator[E, K, Agg]) Flush(ctx context.Context) {
flushed := func() map[K]Agg {
m.mu.Lock()
defer m.mu.Unlock()
flushed := m.mu.cache
m.mu.cache = make(map[K]Agg)
return flushed
}()
func (m *MapReduceAggregator[E, K, Agg]) flushAsync(ctx context.Context, flushed map[K]Agg) {
// TODO(abarganier): We should probably use a stopper async task here.
// TODO(abarganier): Should we make whether this is done async configurable?
go func() {
Expand All @@ -84,3 +84,9 @@ func (m *MapReduceAggregator[E, K, Agg]) Flush(ctx context.Context) {
}
}()
}

func (m *MapReduceAggregator[E, K, V]) getAndResetCacheLocked() map[K]V {
flushed := m.mu.cache
m.mu.cache = make(map[K]V)
return flushed
}
90 changes: 71 additions & 19 deletions pkg/obs/eventagg/map_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

var dummyTestEvent = &testEvent{Key: "XYZ", Count: 1}

func TestMapReduceAggregator_Add(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -48,13 +51,15 @@ func TestMapReduceAggregator_Add(t *testing.T) {
}
ctx := context.Background()
consumer := newTestFlushConsumer[string, *aggTestEvent]()
trigger := &testFlushTrigger{}
mapReduce := NewMapReduceAggregator[*testEvent, string, *aggTestEvent](
func() *aggTestEvent { return &aggTestEvent{} },
trigger,
consumer)
for _, e := range testArgs {
mapReduce.Add(ctx, e)
}
mapReduce.Flush(ctx)
triggerTestFlush(ctx, mapReduce, trigger, dummyTestEvent)
select {
case <-time.After(10 * time.Second):
t.Fatalf("flush timed out")
Expand Down Expand Up @@ -90,6 +95,9 @@ func TestMapReduceAggregator_Flush(t *testing.T) {
defer func() {
envEnableStructuredEvents = false
}()
keyA := "A"
keyB := "B"
keyC := "C"

// verifyFlush verifies all elements in expected match those in actual.
verifyFlush := func(expected map[string]*aggTestEvent, actual map[string]*aggTestEvent) {
Expand All @@ -106,12 +114,11 @@ func TestMapReduceAggregator_Flush(t *testing.T) {
t.Run("properly clears aggregation cache after flush", func(t *testing.T) {
ctx := context.Background()
consumer := newTestFlushConsumer[string, *aggTestEvent]()
trigger := &testFlushTrigger{}
mapReduce := NewMapReduceAggregator[*testEvent, string, *aggTestEvent](
func() *aggTestEvent { return &aggTestEvent{} },
trigger,
consumer)
keyA := "A"
keyB := "B"
keyC := "C"
postFlushEvents := []*testEvent{
{Key: keyA, Count: 5},
{Key: keyA, Count: 3},
Expand All @@ -125,18 +132,21 @@ func TestMapReduceAggregator_Flush(t *testing.T) {
keyC: aggWithCount(9),
}
// Create initial state within the aggregation cache, which
// we expect to clear after Flush().
// we expect to clear after the first flush.
mapReduce.Add(ctx, &testEvent{Key: keyA, Count: 5})
mapReduce.Add(ctx, &testEvent{Key: keyB, Count: 5})
mapReduce.Add(ctx, &testEvent{Key: keyC, Count: 5})
mapReduce.Flush(ctx)
// We start by triggering the flush of the previous state with the 1st
// of the new events.
triggerTestFlush(ctx, mapReduce, trigger, postFlushEvents[0])
<-consumer.consumedCh()
// Now, send a new batch of events and verify that the previously
// flushed data has no impact on the new aggregation window.
for _, e := range postFlushEvents {
for _, e := range postFlushEvents[1:] {
mapReduce.Add(ctx, e)
}
mapReduce.Flush(ctx)
// Trigger a final flush with a dummy event, so we can validate what's flushed.
triggerTestFlush(ctx, mapReduce, trigger, dummyTestEvent)
<-consumer.consumedCh()
verifyFlush(expectedFlush, consumer.flushed)
})
Expand All @@ -148,17 +158,17 @@ func TestMapReduceAggregator_Flush(t *testing.T) {
t.Run("hanging consumption does not block new aggregations", func(t *testing.T) {
ctx := context.Background()
consumer := newTestFlushConsumer[string, *aggTestEvent]()
trigger := &testFlushTrigger{}
mapReduce := NewMapReduceAggregator[*testEvent, string, *aggTestEvent](
func() *aggTestEvent { return &aggTestEvent{} },
trigger,
consumer)
keyA := "A"
keyB := "B"
keyC := "C"
// Create initial state within the aggregation cache, which
// we expect to clear after Flush(). Then, trigger a Flush but
// we expect to clear after the first flush. Then, trigger a flush but
// delay listening on the consumer's channel via consumedCh()
// to simulate a hanging Flush(). We can then push new events to the
// MapReduceAggregator and verify they're included in a new window.
// to simulate a hanging flushConsumer. We can then push new events to the
// MapReduceAggregator and verify they're included in a new window, and that
// they don't block on the hanging flush.
mapReduce.Add(ctx, &testEvent{Key: keyA, Count: 5})
expectedFlush1 := map[string]*aggTestEvent{
keyA: aggWithCount(5),
Expand All @@ -183,19 +193,20 @@ func TestMapReduceAggregator_Flush(t *testing.T) {
time.AfterFunc(100*time.Millisecond, func() {
// Now, send a new batch of events and verify that the previously
// flushed data has no impact on the new aggregation window.
for _, e := range postFlushEvents {
for _, e := range postFlushEvents[1:] {
mapReduce.Add(ctx, e)
}
done <- struct{}{}
})
mapReduce.Flush(ctx) // Should hang
// Trigger a flush of the previous window with the first event of the new window.
triggerTestFlush(ctx, mapReduce, trigger, postFlushEvents[0])
}()
<-done
// This will unblock the first Flush() call
// This will unblock the first flush.
<-consumer.consumedCh()
verifyFlush(expectedFlush1, consumer.flushed)
// Now, perform the final flush and assert it to be correct.
mapReduce.Flush(ctx)
// Now, trigger the final flush and assert it to be correct.
triggerTestFlush(ctx, mapReduce, trigger, dummyTestEvent)
<-consumer.consumedCh()
verifyFlush(expectedFlush2, consumer.flushed)
})
Expand Down Expand Up @@ -259,3 +270,44 @@ type aggTestEvent struct {
func aggWithCount(count int) *aggTestEvent {
return &aggTestEvent{count: count}
}

// testFlushTrigger is a FlushTrigger for use in tests that stores a bool indicating whether
// it should flush. Use setShouldFlush to configure.
type testFlushTrigger struct {
mu struct {
syncutil.Mutex
flush bool
}
}

var _ FlushTrigger = (*testFlushTrigger)(nil)

// shouldFlush implements the FlushTrigger interface.
func (t *testFlushTrigger) shouldFlush() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.mu.flush
}

func (t *testFlushTrigger) setShouldFlush(to bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.flush = to
}

// When an event is passed to the MapReduceAggregator via `.Add()`, it checks if we should flush *first*, before
// aggregating the event. So, to trigger a flush, we indicate via the trigger that a flush should occur, call `.Add()`
// with the event, and then reset the trigger.
//
// The events aggregated prior to our call to `.Add()` will be flushed, and the event we pass to `.Add()` will be
// aggregated into the new window.
func triggerTestFlush(
ctx context.Context,
mr *MapReduceAggregator[*testEvent, string, *aggTestEvent],
trigger *testFlushTrigger,
e *testEvent,
) {
trigger.setShouldFlush(true)
mr.Add(ctx, e)
trigger.setShouldFlush(false)
}
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/sql/sqlstats/insights",
"//pkg/util/metric",
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_redact//:redact",
],
Expand Down
Loading

0 comments on commit fdabacc

Please sign in to comment.