Skip to content

Commit

Permalink
Revert "feat: downsample aggregated metrics (#13449)"
Browse files Browse the repository at this point in the history
This reverts commit 2c053ee.
  • Loading branch information
trevorwhitney committed Jul 31, 2024
1 parent 1584fee commit a6835e1
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 314 deletions.
4 changes: 0 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -623,10 +623,6 @@ pattern_ingester:
# CLI flag: -pattern-ingester.metric-aggregation.log-push-observations
[log_push_observations: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.downsample-period
[downsample_period: <duration> | default = 10s]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
18 changes: 0 additions & 18 deletions pkg/pattern/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (i *Ingester) Flush() {

func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)
i.downsampleMetrics(model.Now())

// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
Expand Down Expand Up @@ -74,20 +73,3 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
return true, nil
})
}

func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()

for _, instance := range instances {
i.downsampleInstance(instance, ts)
}
}

func (i *Ingester) downsampleInstance(instance *instance, ts model.Time) {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
instance.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}
33 changes: 6 additions & 27 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"google.golang.org/grpc/health/grpc_health_v1"

ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -207,33 +206,13 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)

case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case <-i.loopQuit:
return
}
case <-i.loopQuit:
return
}
}
}
Expand Down
129 changes: 5 additions & 124 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -40,16 +39,6 @@ func setup(t *testing.T) *instance {
return inst
}

func downsampleInstance(inst *instance, tts int64) {
ts := model.TimeFromUnixNano(time.Unix(tts, 0).UnixNano())
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

func TestInstancePushQuery(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -66,7 +55,6 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 20)

err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -82,7 +70,6 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 30)

for i := 0; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -100,7 +87,6 @@ func TestInstancePushQuery(t *testing.T) {
})
require.NoError(t, err)
}
downsampleInstance(inst, 30)

it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
Expand Down Expand Up @@ -129,9 +115,6 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -147,8 +130,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand All @@ -166,11 +149,10 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
// end - start / step -- (start is 0, step is 10s)
expectedDataPoints := ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(1), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)
Expand All @@ -188,7 +170,7 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
// end - start / step -- (start is 0, step is 10s)
expectedDataPoints = ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

Expand All @@ -205,101 +187,6 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, float64(4), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test count_over_time samples with downsampling", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 0),
Line: "ts=1 msg=hello",
},
},
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(int64(10*i), 0),
Line: "foo bar foo bar",
},
},
},
},
})
require.NoError(t, err)

// downsample every 20s
if i%2 == 0 {
downsampleInstance(inst, int64(10*i))
}
}

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)

it, err := inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err := iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)

// after the first push there's 2 pushes per sample due to downsampling
require.Equal(t, float64(2), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)

it, err = inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err = iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

// with a larger selection range of 80s, we expect to eventually get up to 8 per datapoint
// our pushes are spaced 10s apart, downsampled every 20s, and there's 10s step,
// so we expect to see the value increase by 2 every samples, maxing out and staying at 8 after 5 samples
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(3), res.Series[0].Samples[1].Value)
require.Equal(t, float64(5), res.Series[0].Samples[2].Value)
require.Equal(t, float64(7), res.Series[0].Samples[3].Value)
require.Equal(t, float64(8), res.Series[0].Samples[4].Value)
require.Equal(t, float64(8), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test bytes_over_time samples", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -315,9 +202,6 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)

downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -333,8 +217,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand Down Expand Up @@ -459,9 +343,6 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -516,8 +397,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

for _, tt := range []struct {
name string
Expand Down
12 changes: 0 additions & 12 deletions pkg/pattern/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ func TestInstance_QuerySample(t *testing.T) {
return instance
}

downsampleInstance := func(inst *instance, ts model.Time) {
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

ctx := context.Background()

thirtySeconds := int64(30000)
Expand Down Expand Up @@ -94,7 +85,6 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(lastTsMilli))

// 5 min query range
// 1 min step
Expand Down Expand Up @@ -213,7 +203,6 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+thirtySeconds))

err = instance.Push(ctx, &logproto.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -256,7 +245,6 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+oneMin+oneMin+thirtySeconds))

// steps
start := then
Expand Down
Loading

0 comments on commit a6835e1

Please sign in to comment.