@@ -132,7 +132,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) {
132
132
133
133
chunkAddCount , chunkSpan := uint32 (10 ), uint32 (300 )
134
134
rets := conf .MustParseRetentions ("1s:1s:5min:5:true" )
135
- agg := NewAggMetric (mockstore , & mockCache , test .GetAMKey (42 ), rets , 0 , chunkSpan , nil , false , false , 0 , 0 )
135
+ agg := NewAggMetric (mockstore , & mockCache , test .GetAMKey (42 ), rets , 0 , chunkSpan , nil , false , false , 0 )
136
136
137
137
for ts := chunkSpan ; ts <= chunkSpan * chunkAddCount ; ts += chunkSpan {
138
138
agg .Add (ts , 1 )
@@ -168,7 +168,7 @@ func TestAggMetric(t *testing.T) {
168
168
cluster .Init ("default" , "test" , time .Now (), "http" , 6060 )
169
169
170
170
ret := conf .MustParseRetentions ("1s:1s:2min:5:true" )
171
- c := NewChecker (t , NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 , 0 ))
171
+ c := NewChecker (t , NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 ))
172
172
173
173
// chunk t0's: 120, 240, 360, 480, 600, 720, 840, 960
174
174
@@ -246,7 +246,7 @@ func TestAggMetricWithReorderBuffer(t *testing.T) {
246
246
AggregationMethod : []conf.Method {conf .Avg },
247
247
}
248
248
ret := conf .MustParseRetentions ("1s:1s:2min:5:true" )
249
- c := NewChecker (t , NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 10 , 1 , & agg , false , false , 0 , 0 ))
249
+ c := NewChecker (t , NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 10 , 1 , & agg , false , false , 0 ))
250
250
251
251
// basic adds and verifies with test data
252
252
c .Add (121 , 121 )
@@ -284,7 +284,7 @@ func TestAggMetricDropFirstChunk(t *testing.T) {
284
284
cluster .Manager .SetPrimary (true )
285
285
mockstore .Reset ()
286
286
rets := conf .MustParseRetentions ("1s:1s:10s:5:true" )
287
- m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), rets , 0 , 1 , nil , false , true , 0 , 0 )
287
+ m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), rets , 0 , 1 , nil , false , true , 0 )
288
288
m .Add (10 , 10 )
289
289
m .Add (11 , 11 )
290
290
m .Add (12 , 12 )
@@ -312,7 +312,7 @@ func TestAggMetricIngestFrom(t *testing.T) {
312
312
mockstore .Reset ()
313
313
ingestFrom := int64 (25 )
314
314
ret := conf .MustParseRetentions ("1s:1s:10s:5:true" )
315
- m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , ingestFrom , 0 )
315
+ m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , ingestFrom )
316
316
m .Add (10 , 10 )
317
317
m .Add (11 , 11 )
318
318
m .Add (12 , 12 )
@@ -342,27 +342,78 @@ func TestAggMetricFutureTolerance(t *testing.T) {
342
342
cluster .Init ("default" , "test" , time .Now (), "http" , 6060 )
343
343
cluster .Manager .SetPrimary (true )
344
344
mockstore .Reset ()
345
- ret := conf .MustParseRetentions ("1s:1s:10s:5:true" )
346
- aggMetricLimited := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 , 60 )
347
- aggMetricUnlimited := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 , 0 )
345
+ ret := conf .MustParseRetentions ("1s:10m:6h:5:true" )
346
+
347
+ _futureToleranceRatio := futureToleranceRatio
348
+ _enforceFutureTolerance := enforceFutureTolerance
349
+ defer func () {
350
+ futureToleranceRatio = _futureToleranceRatio
351
+ enforceFutureTolerance = _enforceFutureTolerance
352
+ discardedSampleTooFarAhead .SetUint32 (0 )
353
+ }()
354
+
355
+ // with a raw retention of 600s, this will result in a future tolerance of 60s
356
+ futureToleranceRatio = 10
357
+ aggMetricTolerate60 := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 )
358
+
359
+ // will not tolerate future datapoints at all
360
+ futureToleranceRatio = 0
361
+ aggMetricTolerate0 := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 )
362
+
363
+ // add datapoint which is 30 seconds in the future to both aggmetrics, they should both accept it
364
+ // because enforcement of future tolerance is disabled, but the one with tolerance 0 should increase
365
+ // the counter of data points that would have been rejected
366
+ discardedSampleTooFarAhead .SetUint32 (0 )
367
+ enforceFutureTolerance = false
368
+ aggMetricTolerate60 .Add (uint32 (time .Now ().Unix ()+ 30 ), 10 )
369
+ if len (aggMetricTolerate60 .chunks ) != 1 {
370
+ t .Fatalf ("expected to have 1 chunk in aggmetric, but there were %d" , len (aggMetricTolerate60 .chunks ))
371
+ }
372
+ if discardedSampleTooFarAhead .Peek () != 0 {
373
+ t .Fatalf ("expected the discardedSampleTooFarAhead count to be 0, but it was %d" , discardedSampleTooFarAhead .Peek ())
374
+ }
348
375
349
- // add datapoint which is 90 seconds in the future
350
- // the limited aggmetric should not accept it because it is more than 60 seconds in the future
351
- // the unlimited aggmetric should accept it because there is no limit
352
- aggMetricLimited .Add (uint32 (time .Now ().Unix ()+ 90 ), 10 )
353
- aggMetricUnlimited .Add (uint32 (time .Now ().Unix ()+ 90 ), 10 )
354
- if len (aggMetricLimited .chunks ) != 0 {
355
- t .Fatalf ("expected to have no chunks in limited aggmetric, but there were %d" , len (aggMetricLimited .chunks ))
376
+ aggMetricTolerate0 .Add (uint32 (time .Now ().Unix ()+ 30 ), 10 )
377
+ if len (aggMetricTolerate0 .chunks ) != 1 {
378
+ t .Fatalf ("expected to have 1 chunk in aggmetric, but there were %d" , len (aggMetricTolerate0 .chunks ))
356
379
}
357
- if len ( aggMetricUnlimited . chunks ) != 1 {
358
- t .Fatalf ("expected to have 1 chunk in the unlimited aggmetric , but there were %d" , len ( aggMetricUnlimited . chunks ))
380
+ if discardedSampleTooFarAhead . Peek ( ) != 1 {
381
+ t .Fatalf ("expected the discardedSampleTooFarAhead count to be 1 , but it was %d" , discardedSampleTooFarAhead . Peek ( ))
359
382
}
360
383
361
- // add datapoint where the timestamp is now
362
- // the limited aggmetric should accept this one, because it only rejects datapoints from at least 60 seconds in the future
363
- aggMetricLimited .Add (uint32 (time .Now ().Unix ()), 10 )
364
- if len (aggMetricLimited .chunks ) != 1 {
365
- t .Fatalf ("expected to have 1 chunk in the limited aggmetric, but there were %d" , len (aggMetricLimited .chunks ))
384
+ // enable the enforcement of the future tolerance limit and re-initialize the two agg metrics
385
+ // then add a data point with time stamp 30 sec in the future to both aggmetrics again.
386
+ // this time only the one that tolerates up to 60 secs should accept the datapoint.
387
+ discardedSampleTooFarAhead .SetUint32 (0 )
388
+ enforceFutureTolerance = true
389
+ futureToleranceRatio = 10
390
+ aggMetricTolerate60 = NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 )
391
+ futureToleranceRatio = 0
392
+ aggMetricTolerate0 = NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , nil , false , false , 0 )
393
+
394
+ aggMetricTolerate60 .Add (uint32 (time .Now ().Unix ()+ 30 ), 10 )
395
+ if len (aggMetricTolerate60 .chunks ) != 1 {
396
+ t .Fatalf ("expected to have 1 chunk in aggmetric, but there were %d" , len (aggMetricTolerate60 .chunks ))
397
+ }
398
+ if discardedSampleTooFarAhead .Peek () != 0 {
399
+ t .Fatalf ("expected the discardedSampleTooFarAhead count to be 0, but it was %d" , discardedSampleTooFarAhead .Peek ())
400
+ }
401
+
402
+ aggMetricTolerate0 .Add (uint32 (time .Now ().Unix ()+ 30 ), 10 )
403
+ if len (aggMetricTolerate0 .chunks ) != 0 {
404
+ t .Fatalf ("expected to have 0 chunks in aggmetric, but there were %d" , len (aggMetricTolerate0 .chunks ))
405
+ }
406
+ if discardedSampleTooFarAhead .Peek () != 1 {
407
+ t .Fatalf ("expected the discardedSampleTooFarAhead count to be 1, but it was %d" , discardedSampleTooFarAhead .Peek ())
408
+ }
409
+
410
+ // add another datapoint with timestamp of now() to the aggmetric tolerating 0, should be accepted
411
+ aggMetricTolerate0 .Add (uint32 (time .Now ().Unix ()), 10 )
412
+ if len (aggMetricTolerate0 .chunks ) != 1 {
413
+ t .Fatalf ("expected to have 1 chunk in aggmetric, but there were %d" , len (aggMetricTolerate0 .chunks ))
414
+ }
415
+ if discardedSampleTooFarAhead .Peek () != 1 {
416
+ t .Fatalf ("expected the discardedSampleTooFarAhead count to be 1, but it was %d" , discardedSampleTooFarAhead .Peek ())
366
417
}
367
418
}
368
419
@@ -405,7 +456,7 @@ func TestGetAggregated(t *testing.T) {
405
456
AggregationMethod : []conf.Method {conf .Sum },
406
457
}
407
458
408
- m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , & agg , false , false , 0 , 0 )
459
+ m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , & agg , false , false , 0 )
409
460
m .Add (10 , 10 )
410
461
m .Add (11 , 11 )
411
462
m .Add (12 , 12 )
@@ -451,7 +502,7 @@ func TestGetAggregatedIngestFrom(t *testing.T) {
451
502
AggregationMethod : []conf.Method {conf .Sum },
452
503
}
453
504
454
- m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , & agg , false , false , ingestFrom , 0 )
505
+ m := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (42 ), ret , 0 , 1 , & agg , false , false , ingestFrom )
455
506
m .Add (10 , 10 )
456
507
m .Add (11 , 11 )
457
508
m .Add (12 , 12 )
@@ -491,7 +542,7 @@ func BenchmarkAggMetricAdd(b *testing.B) {
491
542
492
543
// each chunk contains 180 points
493
544
rets := conf .MustParseRetentions ("10s:1000000000s,30min:1" )
494
- metric := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (0 ), rets , 0 , 10 , nil , false , false , 0 , 0 )
545
+ metric := NewAggMetric (mockstore , & cache.MockCache {}, test .GetAMKey (0 ), rets , 0 , 10 , nil , false , false , 0 )
495
546
496
547
max := uint32 (b .N * 10 + 1 )
497
548
for t := uint32 (1 ); t < max ; t += 10 {
0 commit comments