Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit b88c3b8

Browse files
authored
Merge pull request #1572 from grafana/limit_datapoints_with_future_ts
Reject datapoints with timestamp too far in the future
2 parents f099822 + 66dc512 commit b88c3b8

File tree

13 files changed

+296
-10
lines changed

13 files changed

+296
-10
lines changed

docker/docker-chaos/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ create-cf = true
180180
schemas-file = /etc/metrictank/storage-schemas.conf
181181
# path to storage-aggregation.conf file
182182
aggregations-file = /etc/metrictank/storage-aggregation.conf
183+
# enables/disables the enforcement of the future tolerance limitation
184+
enforce-future-tolerance = true
185+
# defines until how far in the future we accept datapoints. defined as a percentage fraction of the raw ttl of the matching retention storage schema
186+
future-tolerance-ratio = 10
183187

184188
## instrumentation stats ##
185189
[stats]

docker/docker-cluster-query/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ create-cf = true
180180
schemas-file = /etc/metrictank/storage-schemas.conf
181181
# path to storage-aggregation.conf file
182182
aggregations-file = /etc/metrictank/storage-aggregation.conf
183+
# enables/disables the enforcement of the future tolerance limitation
184+
enforce-future-tolerance = true
185+
# defines until how far in the future we accept datapoints. defined as a percentage fraction of the raw ttl of the matching retention storage schema
186+
future-tolerance-ratio = 10
183187

184188
## instrumentation stats ##
185189
[stats]

docker/docker-cluster/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ create-cf = true
180180
schemas-file = /etc/metrictank/storage-schemas.conf
181181
# path to storage-aggregation.conf file
182182
aggregations-file = /etc/metrictank/storage-aggregation.conf
183+
# enables/disables the enforcement of the future tolerance limitation
184+
enforce-future-tolerance = true
185+
# defines until how far in the future we accept datapoints. defined as a percentage fraction of the raw ttl of the matching retention storage schema
186+
future-tolerance-ratio = 10
183187

184188
## instrumentation stats ##
185189
[stats]

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ create-cf = true
180180
schemas-file = /etc/metrictank/storage-schemas.conf
181181
# path to storage-aggregation.conf file
182182
aggregations-file = /etc/metrictank/storage-aggregation.conf
183+
# enables/disables the enforcement of the future tolerance limitation
184+
enforce-future-tolerance = true
185+
# defines until how far in the future we accept datapoints. defined as a percentage fraction of the raw ttl of the matching retention storage schema
186+
future-tolerance-ratio = 10
183187

184188
## instrumentation stats ##
185189
[stats]

docs/config.md

+4
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ create-cf = true
220220
schemas-file = /etc/metrictank/storage-schemas.conf
221221
# path to storage-aggregation.conf file
222222
aggregations-file = /etc/metrictank/storage-aggregation.conf
223+
# enables/disables the enforcement of the future tolerance limitation
224+
enforce-future-tolerance = true
225+
# defines until how far in the future we accept datapoints. defined as a percentage fraction of the raw ttl of the matching retention storage schema
226+
future-tolerance-ratio = 10
223227
```
224228

225229
## instrumentation stats ##

docs/metrics.md

+9
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,10 @@ your (infrequent) updates. Any points revcieved for a chunk that has already be
370370
* `tank.discarded.sample-out-of-order`:
371371
points that go back in time beyond the scope of the optional reorder window.
372372
these points will end up being dropped and lost.
373+
* `tank.discarded.sample-too-far-ahead`:
374+
count of points which got discareded because their timestamp
375+
is too far in the future, beyond the limitation of the future tolerance window defined via the
376+
retention.future-tolerance-ratio parameter.
373377
* `tank.discarded.unknown`:
374378
points that have been discarded for unknown reasons.
375379
* `tank.gc_metric`:
@@ -384,6 +388,11 @@ ts is not older than the 60th datapoint counting from the newest.
384388
* `tank.persist`:
385389
how long it takes to persist a chunk (and chunks preceding it)
386390
this is subject to backpressure from the store when the store's queue runs full
391+
* `tank.sample-too-far-ahead`:
392+
count of points with a timestamp which is too far in the future,
393+
beyond the limitation of the future tolerance window defined via the retention.future-tolerance-ratio
394+
parameter. it also gets increased if the enforcement of the future tolerance is disabled, this is
395+
useful for prediciting whether data points would get rejected once enforcement gets turned on.
387396
* `tank.total_points`:
388397
the number of points currently held in the in-memory ringbuffer
389398
* `version.%s`:

mdata/aggmetric.go

+26-8
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type AggMetric struct {
4343
aggregators []*Aggregator
4444
dropFirstChunk bool
4545
ingestFromT0 uint32
46+
futureTolerance uint32
4647
ttl uint32
4748
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
4849
lastWrite uint32 // wall clock time of when last point was successfully added (possibly to the ROB)
@@ -61,14 +62,15 @@ func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey,
6162
ret := retentions.Rets[0]
6263

6364
m := AggMetric{
64-
cachePusher: cachePusher,
65-
store: store,
66-
key: key,
67-
chunkSpan: ret.ChunkSpan,
68-
numChunks: ret.NumChunks,
69-
chunks: make([]*chunk.Chunk, 0, ret.NumChunks),
70-
dropFirstChunk: dropFirstChunk,
71-
ttl: uint32(ret.MaxRetention()),
65+
cachePusher: cachePusher,
66+
store: store,
67+
key: key,
68+
chunkSpan: ret.ChunkSpan,
69+
numChunks: ret.NumChunks,
70+
chunks: make([]*chunk.Chunk, 0, ret.NumChunks),
71+
dropFirstChunk: dropFirstChunk,
72+
futureTolerance: uint32(ret.MaxRetention()) * uint32(futureToleranceRatio) / 100,
73+
ttl: uint32(ret.MaxRetention()),
7274
// we set LastWrite here to make sure a new Chunk doesn't get immediately
7375
// garbage collected right after creating it, before we can push to it.
7476
lastWrite: uint32(time.Now().Unix()),
@@ -442,6 +444,22 @@ func (a *AggMetric) Add(ts uint32, val float64) {
442444
return
443445
}
444446

447+
// need to check if ts > futureTolerance to prevent that we reject a datapoint
448+
// because the ts value has wrapped around the uint32 boundary
449+
if ts > a.futureTolerance && int64(ts-a.futureTolerance) > time.Now().Unix() {
450+
sampleTooFarAhead.Inc()
451+
452+
if enforceFutureTolerance {
453+
if log.IsLevelEnabled(log.DebugLevel) {
454+
log.Debugf("AM: discarding metric <%d,%f>: timestamp is too far in the future, accepting timestamps up to %d seconds into the future", ts, val, a.futureTolerance)
455+
}
456+
457+
discardedSampleTooFarAhead.Inc()
458+
PromDiscardedSamples.WithLabelValues(tooFarAhead, strconv.Itoa(int(a.key.MKey.Org))).Inc()
459+
return
460+
}
461+
}
462+
445463
a.Lock()
446464
defer a.Unlock()
447465

mdata/aggmetric_test.go

+103
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,109 @@ func TestAggMetricIngestFrom(t *testing.T) {
334334
}
335335
}
336336

337+
// TestAggMetricFutureTolerance tests whether the future tolerance limit works correctly
338+
// there is a race condition because it depends on the return value of time.Now().Unix(),
339+
// realistically it should never fail due to that race condition unless it executes
340+
// unreasonably slow.
341+
func TestAggMetricFutureTolerance(t *testing.T) {
342+
cluster.Init("default", "test", time.Now(), "http", 6060)
343+
cluster.Manager.SetPrimary(true)
344+
mockstore.Reset()
345+
ret := conf.MustParseRetentions("1s:10m:6h:5:true")
346+
347+
_futureToleranceRatio := futureToleranceRatio
348+
_enforceFutureTolerance := enforceFutureTolerance
349+
discardedSampleTooFarAhead.SetUint32(0)
350+
sampleTooFarAhead.SetUint32(0)
351+
defer func() {
352+
futureToleranceRatio = _futureToleranceRatio
353+
enforceFutureTolerance = _enforceFutureTolerance
354+
discardedSampleTooFarAhead.SetUint32(0)
355+
sampleTooFarAhead.SetUint32(0)
356+
}()
357+
358+
// with a raw retention of 600s, this will result in a future tolerance of 60s
359+
futureToleranceRatio = 10
360+
aggMetricTolerate60 := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, false, false, 0)
361+
362+
// will not tolerate future datapoints at all
363+
futureToleranceRatio = 0
364+
aggMetricTolerate0 := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, false, false, 0)
365+
366+
// add datapoint which is 30 seconds in the future to both aggmetrics, they should both accept it
367+
// because enforcement of future tolerance is disabled, but the one with tolerance 0 should increase
368+
// the counter of data points that would have been rejected
369+
enforceFutureTolerance = false
370+
aggMetricTolerate60.Add(uint32(time.Now().Unix()+30), 10)
371+
if len(aggMetricTolerate60.chunks) != 1 {
372+
t.Fatalf("expected to have 1 chunk in aggmetric, but there were %d", len(aggMetricTolerate60.chunks))
373+
}
374+
if sampleTooFarAhead.Peek() != 0 {
375+
t.Fatalf("expected the sampleTooFarAhead count to be 0, but it was %d", sampleTooFarAhead.Peek())
376+
}
377+
if discardedSampleTooFarAhead.Peek() != 0 {
378+
t.Fatalf("expected the discardedSampleTooFarAhead count to be 0, but it was %d", discardedSampleTooFarAhead.Peek())
379+
}
380+
381+
aggMetricTolerate0.Add(uint32(time.Now().Unix()+30), 10)
382+
if len(aggMetricTolerate0.chunks) != 1 {
383+
t.Fatalf("expected to have 1 chunk in aggmetric, but there were %d", len(aggMetricTolerate0.chunks))
384+
}
385+
if sampleTooFarAhead.Peek() != 1 {
386+
t.Fatalf("expected the sampleTooFarAhead count to be 1, but it was %d", sampleTooFarAhead.Peek())
387+
}
388+
if discardedSampleTooFarAhead.Peek() != 0 {
389+
t.Fatalf("expected the discardedSampleTooFarAhead count to be 0, but it was %d", discardedSampleTooFarAhead.Peek())
390+
}
391+
392+
// enable the enforcement of the future tolerance limit and re-initialize the two agg metrics
393+
// then add a data point with time stamp 30 sec in the future to both aggmetrics again.
394+
// this time only the one that tolerates up to 60 secs should accept the datapoint.
395+
discardedSampleTooFarAhead.SetUint32(0)
396+
sampleTooFarAhead.SetUint32(0)
397+
enforceFutureTolerance = true
398+
futureToleranceRatio = 10
399+
aggMetricTolerate60 = NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, false, false, 0)
400+
futureToleranceRatio = 0
401+
aggMetricTolerate0 = NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, false, false, 0)
402+
403+
aggMetricTolerate60.Add(uint32(time.Now().Unix()+30), 10)
404+
if len(aggMetricTolerate60.chunks) != 1 {
405+
t.Fatalf("expected to have 1 chunk in aggmetric, but there were %d", len(aggMetricTolerate60.chunks))
406+
}
407+
if sampleTooFarAhead.Peek() != 0 {
408+
t.Fatalf("expected the sampleTooFarAhead count to be 0, but it was %d", sampleTooFarAhead.Peek())
409+
}
410+
if discardedSampleTooFarAhead.Peek() != 0 {
411+
t.Fatalf("expected the discardedSampleTooFarAhead count to be 0, but it was %d", discardedSampleTooFarAhead.Peek())
412+
}
413+
414+
aggMetricTolerate0.Add(uint32(time.Now().Unix()+30), 10)
415+
if len(aggMetricTolerate0.chunks) != 0 {
416+
t.Fatalf("expected to have 0 chunks in aggmetric, but there were %d", len(aggMetricTolerate0.chunks))
417+
}
418+
if sampleTooFarAhead.Peek() != 1 {
419+
t.Fatalf("expected the sampleTooFarAhead count to be 1, but it was %d", sampleTooFarAhead.Peek())
420+
}
421+
if discardedSampleTooFarAhead.Peek() != 1 {
422+
t.Fatalf("expected the discardedSampleTooFarAhead count to be 1, but it was %d", discardedSampleTooFarAhead.Peek())
423+
}
424+
425+
// add another datapoint with timestamp of now() to the aggmetric tolerating 0, should be accepted
426+
discardedSampleTooFarAhead.SetUint32(0)
427+
sampleTooFarAhead.SetUint32(0)
428+
aggMetricTolerate0.Add(uint32(time.Now().Unix()), 10)
429+
if len(aggMetricTolerate0.chunks) != 1 {
430+
t.Fatalf("expected to have 1 chunk in aggmetric, but there were %d", len(aggMetricTolerate0.chunks))
431+
}
432+
if sampleTooFarAhead.Peek() != 0 {
433+
t.Fatalf("expected the sampleTooFarAhead count to be 0, but it was %d", sampleTooFarAhead.Peek())
434+
}
435+
if discardedSampleTooFarAhead.Peek() != 0 {
436+
t.Fatalf("expected the discardedSampleTooFarAhead count to be 0, but it was %d", discardedSampleTooFarAhead.Peek())
437+
}
438+
}
439+
337440
func itersToPoints(iters []tsz.Iter) []schema.Point {
338441
var points []schema.Point
339442
for _, it := range iters {

mdata/aggmetrics_test.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package mdata
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/grafana/metrictank/conf"
8+
"github.com/grafana/metrictank/mdata/cache"
9+
"github.com/grafana/metrictank/mdata/chunk"
10+
"github.com/grafana/metrictank/schema"
11+
)
12+
13+
type mockCachePusher struct{}
14+
15+
func (m *mockCachePusher) AddIfHot(_ schema.AMKey, _ uint32, _ chunk.IterGen) {}
16+
17+
func NewMockCachePusher() cache.CachePusher {
18+
return &mockCachePusher{}
19+
}
20+
21+
func TestAggMetricsGetOrCreate(t *testing.T) {
22+
mockStore := NewMockStore()
23+
mockCachePusher := NewMockCachePusher()
24+
ingestFrom := make(map[uint32]int64)
25+
chunkMaxStale := uint32(60)
26+
metricMaxStale := uint32(120)
27+
gcInterval := time.Hour
28+
29+
_futureToleranceRatio := futureToleranceRatio
30+
_aggregations := Aggregations
31+
_schemas := Schemas
32+
defer func() {
33+
futureToleranceRatio = _futureToleranceRatio
34+
Aggregations = _aggregations
35+
Schemas = _schemas
36+
}()
37+
38+
futureToleranceRatio = 50
39+
Aggregations = conf.NewAggregations()
40+
Schemas = conf.NewSchemas([]conf.Schema{{
41+
Name: "schema1",
42+
Retentions: conf.Retentions{
43+
Rets: []conf.Retention{
44+
{
45+
SecondsPerPoint: 10,
46+
NumberOfPoints: 360 * 24,
47+
ChunkSpan: 600,
48+
NumChunks: 2,
49+
Ready: 0,
50+
}, {
51+
SecondsPerPoint: 3600,
52+
NumberOfPoints: 24 * 365,
53+
ChunkSpan: 24 * 3600,
54+
NumChunks: 2,
55+
Ready: 0,
56+
}},
57+
},
58+
}})
59+
60+
aggMetrics := NewAggMetrics(mockStore, mockCachePusher, false, ingestFrom, chunkMaxStale, metricMaxStale, gcInterval)
61+
62+
testKey1, _ := schema.AMKeyFromString("1.12345678901234567890123456789012")
63+
metric := aggMetrics.GetOrCreate(testKey1.MKey, 1, 0, 10).(*AggMetric)
64+
65+
if metric.store != mockStore {
66+
t.Fatalf("Expected metric to have mock store, but it did not")
67+
}
68+
69+
if metric.cachePusher != mockCachePusher {
70+
t.Fatalf("Expected metric to have mock cache pusher, but it did not")
71+
}
72+
73+
if metric.key.MKey != testKey1.MKey {
74+
t.Fatalf("Expected metric to have test metric key, but it did not")
75+
}
76+
77+
if metric.chunkSpan != 24*3600 {
78+
t.Fatalf("Expected metric chunk span to be %d, but it was %d", 24*3600, metric.chunkSpan)
79+
}
80+
81+
if metric.numChunks != 2 {
82+
t.Fatalf("Expected metric num chunks to be 2, but it was %d", metric.numChunks)
83+
}
84+
85+
if metric.ttl != 3600*24*365 {
86+
t.Fatalf("Expected metric ttl to be %d, but it was %d", 3600*24*365, metric.ttl)
87+
}
88+
89+
// storage schema's maxTTL is 1 year, future tolerance ratio is 50, so our future tolerance should be 1/2 year
90+
expectedFutureTolerance := uint32(3600 * 24 * 365 * futureToleranceRatio / 100)
91+
if metric.futureTolerance != expectedFutureTolerance {
92+
t.Fatalf("Expected future tolerance to be %d, was %d", expectedFutureTolerance, metric.futureTolerance)
93+
}
94+
95+
// verify that two calls to GetOrCreate with the same parameters return the same struct
96+
metric2 := aggMetrics.GetOrCreate(testKey1.MKey, 1, 0, 10).(*AggMetric)
97+
if metric != metric2 {
98+
t.Fatalf("Expected GetOrCreate to return twice the same metric for the same key")
99+
}
100+
101+
futureToleranceRatio = 0
102+
testKey2, _ := schema.AMKeyFromString("1.12345678901234567890123456789013")
103+
metric3 := aggMetrics.GetOrCreate(testKey2.MKey, 1, 0, 10).(*AggMetric)
104+
if metric3.futureTolerance != 0 {
105+
t.Fatalf("Future tolerance was expected to be 0, but it was %d", metric3.futureTolerance)
106+
}
107+
}

mdata/init.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const (
2020
sampleOutOfOrder = "sample-out-of-order"
2121
receivedTooLate = "received-too-late"
2222
newValueForTimestamp = "new-value-for-timestamp"
23+
tooFarAhead = "too-far-in-future"
2324
)
2425

2526
var (
@@ -39,6 +40,17 @@ var (
3940
// these points will end up being dropped and lost.
4041
discardedSampleOutOfOrder = stats.NewCounterRate32("tank.discarded.sample-out-of-order")
4142

43+
// metric tank.discarded.sample-too-far-ahead is count of points which got discareded because their timestamp
44+
// is too far in the future, beyond the limitation of the future tolerance window defined via the
45+
// retention.future-tolerance-ratio parameter.
46+
discardedSampleTooFarAhead = stats.NewCounterRate32("tank.discarded.sample-too-far-ahead")
47+
48+
// metric tank.sample-too-far-ahead is count of points with a timestamp which is too far in the future,
49+
// beyond the limitation of the future tolerance window defined via the retention.future-tolerance-ratio
50+
// parameter. it also gets increased if the enforcement of the future tolerance is disabled, this is
51+
// useful for prediciting whether data points would get rejected once enforcement gets turned on.
52+
sampleTooFarAhead = stats.NewCounterRate32("tank.sample-too-far-ahead")
53+
4254
// metric tank.discarded.received-too-late is points received for the most recent chunk
4355
// when that chunk is already being "closed", ie the end-of-stream marker has been written to the chunk.
4456
// this indicates that your GC is actively sealing chunks and saving them before you have the chance to send
@@ -84,8 +96,10 @@ var (
8496
Aggregations conf.Aggregations
8597
Schemas conf.Schemas
8698

87-
schemasFile = "/etc/metrictank/storage-schemas.conf"
88-
aggFile = "/etc/metrictank/storage-aggregation.conf"
99+
schemasFile = "/etc/metrictank/storage-schemas.conf"
100+
aggFile = "/etc/metrictank/storage-aggregation.conf"
101+
futureToleranceRatio = uint(10)
102+
enforceFutureTolerance = true
89103

90104
promActiveMetrics = promauto.NewGaugeVec(prometheus.GaugeOpts{
91105
Namespace: "metrictank",
@@ -104,6 +118,8 @@ func ConfigSetup() {
104118
retentionConf := flag.NewFlagSet("retention", flag.ExitOnError)
105119
retentionConf.StringVar(&schemasFile, "schemas-file", "/etc/metrictank/storage-schemas.conf", "path to storage-schemas.conf file")
106120
retentionConf.StringVar(&aggFile, "aggregations-file", "/etc/metrictank/storage-aggregation.conf", "path to storage-aggregation.conf file")
121+
retentionConf.UintVar(&futureToleranceRatio, "future-tolerance-ratio", 10, "defines until how far in the future we accept datapoints. defined as a percentage fraction of the raw ttl of the matching retention storage schema")
122+
retentionConf.BoolVar(&enforceFutureTolerance, "enforce-future-tolerance", true, "enables/disables the enforcement of the future tolerance limitation")
107123
globalconf.Register("retention", retentionConf, flag.ExitOnError)
108124
}
109125

0 commit comments

Comments
 (0)