Skip to content

Commit

Permalink
Chunk the flush by time series
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Jun 9, 2023
1 parent 0a31512 commit 9254ce7
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
// APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"`
APIVersion null.Int `json:"-"`

// TODO: rename the config field to align to the new logic by time series
// when the migration from the version 1 is completed.
MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"`

// The time interval between periodic API calls for sending samples to the cloud ingest service.
Expand Down
24 changes: 21 additions & 3 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ type metricsFlusher struct {
bq *bucketQ
client pusher
aggregationPeriodInSeconds uint32
maxSeriesInSingleBatch int
}

// Flush flushes the queued buckets sending them to the remote Cloud service.
// If the number of time series collected is bigger than maximum batch size than
// it splits in chunks.
func (f *metricsFlusher) Flush(ctx context.Context) error {
// drain the buffer
buckets := f.bq.PopAll()
Expand All @@ -36,10 +40,24 @@ func (f *metricsFlusher) Flush(ctx context.Context) error {

msb := newMetricSetBuilder(f.referenceID, f.aggregationPeriodInSeconds)
for i := 0; i < len(buckets); i++ {
msb.addTimeBucket(&buckets[i])
msb.addTimeBucket(buckets[i])
if len(msb.seriesIndex) < f.maxSeriesInSingleBatch {
continue
}

// we hit the chunk size, let's flush
err := f.client.push(ctx, f.referenceID, msb.MetricSet)
if err != nil {
return err
}
msb = newMetricSetBuilder(f.referenceID, f.aggregationPeriodInSeconds)
}

if len(msb.seriesIndex) < 1 {
return nil
}

// send the MetricSet to the remote service
// send the last (or the unique) MetricSet chunk to the remote service
return f.client.push(ctx, f.referenceID, msb.MetricSet)
}

Expand Down Expand Up @@ -84,7 +102,7 @@ func newMetricSetBuilder(testRunID string, aggrPeriodSec uint32) metricSetBuilde
}
}

func (msb *metricSetBuilder) addTimeBucket(bucket *timeBucket) {
func (msb *metricSetBuilder) addTimeBucket(bucket timeBucket) {
for timeSeries, sink := range bucket.Sinks {
pbmetric, ok := msb.metrics[timeSeries.Metric]
if !ok {
Expand Down
60 changes: 59 additions & 1 deletion output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package expv2

import (
"context"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

// TODO: additional case
Expand All @@ -29,7 +32,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
},
}
msb := newMetricSetBuilder("testrunid-123", 1)
msb.addTimeBucket(&tb)
msb.addTimeBucket(tb)

assert.Contains(t, msb.metrics, m1)
require.Contains(t, msb.seriesIndex, timeSeries)
Expand All @@ -38,3 +41,58 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
require.Len(t, msb.MetricSet.Metrics, 1)
assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1)
}

func TestMetricsFlusherFlushChunk(t *testing.T) {
t.Parallel()

testCases := []struct {
series int
expFlushCalls int
}{
{series: 5, expFlushCalls: 2},
{series: 2, expFlushCalls: 1},
}

r := metrics.NewRegistry()
m1 := r.MustNewMetric("metric1", metrics.Counter)

for _, tc := range testCases {
bq := &bucketQ{}
pm := &pusherMock{}
mf := metricsFlusher{
bq: bq,
client: pm,
maxSeriesInSingleBatch: 3,
}

bq.buckets = make([]timeBucket, 0, tc.series)
for i := 0; i < tc.series; i++ {
ts := metrics.TimeSeries{
Metric: m1,
Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)),
}
bq.Push([]timeBucket{
{
Time: int64(i) + 1,
Sinks: map[metrics.TimeSeries]metricValue{
ts: &counter{Sum: float64(1)},
},
},
})
}
require.Len(t, bq.buckets, tc.series)

err := mf.Flush(context.TODO())
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
}
}

type pusherMock struct {
pushCalled int
}

func (pm *pusherMock) push(_ context.Context, _ string, _ *pbcloud.MetricSet) error {
pm.pushCalled++
return nil
}
3 changes: 3 additions & 0 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func (o *Output) Start() error {
bq: &o.collector.bq,
client: mc,
aggregationPeriodInSeconds: uint32(o.config.AggregationPeriod.TimeDuration().Seconds()),
// TODO: rename the config field to align to the new logic by time series
// when the migration from the version 1 is completed.
maxSeriesInSingleBatch: int(o.config.MaxMetricSamplesPerPackage.Int64),
}

o.periodicInvoke(o.config.MetricPushInterval.TimeDuration(), o.flushMetrics)
Expand Down

0 comments on commit 9254ce7

Please sign in to comment.