Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 3f3a468

Browse files
committed
AggMetrics.ingestAfter: simplify and generalise by using a map orgId -> timestamp
1 parent 61a9d74 commit 3f3a468

File tree

5 files changed

+14
-24
lines changed

5 files changed

+14
-24
lines changed

api/ccache_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func newSrv(delSeries, delArchives int) (*Server, *cache.MockCache) {
3333
mockCache := cache.NewMockCache()
3434
mockCache.DelMetricSeries = delSeries
3535
mockCache.DelMetricArchives = delArchives
36-
metrics := mdata.NewAggMetrics(store, mockCache, false, 0, 0, 0, 0, 0)
36+
metrics := mdata.NewAggMetrics(store, mockCache, false, nil, 0, 0, 0)
3737
srv.BindMemoryStore(metrics)
3838
srv.BindCache(mockCache)
3939

api/dataprocessor_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func TestGetSeriesFixed(t *testing.T) {
346346
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
347347
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, 0))
348348

349-
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0, 0, 0)
349+
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, nil, 0, 0, 0)
350350
srv, _ := NewServer()
351351
srv.BindBackendStore(store)
352352
srv.BindMemoryStore(metrics)
@@ -501,7 +501,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
501501
store := mdata.NewMockStore()
502502
srv.BindBackendStore(store)
503503

504-
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0, 0, 0)
504+
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, nil, 0, 0, 0)
505505
srv.BindMemoryStore(metrics)
506506
metric := test.GetAMKey(1)
507507

@@ -687,7 +687,7 @@ func TestGetSeriesAggMetrics(t *testing.T) {
687687
cluster.Init("default", "test", time.Now(), "http", 6060)
688688
store := mdata.NewMockStore()
689689

690-
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0, 0, 0)
690+
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, nil, 0, 0, 0)
691691
srv, _ := NewServer()
692692
srv.BindBackendStore(store)
693693
srv.BindMemoryStore(metrics)

cmd/metrictank/metrictank.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,10 @@ func main() {
302302
if ingestAfterTimestamp > 0 {
303303
log.Infof("Will only ingest data points for org id %d belonging to chunks starting after %s", ingestAfterOrgID, time.Unix(ingestAfterTimestamp, 0))
304304
}
305+
ingestAfter := make(map[uint32]int64)
306+
ingestAfter[ingestAfterOrgID] = ingestAfterTimestamp
305307
if inputEnabled {
306-
metrics = mdata.NewAggMetrics(store, ccache, *dropFirstChunk, ingestAfterOrgID, ingestAfterTimestamp, chunkMaxStale, metricMaxStale, gcInterval)
308+
metrics = mdata.NewAggMetrics(store, ccache, *dropFirstChunk, ingestAfter, chunkMaxStale, metricMaxStale, gcInterval)
307309
}
308310

309311
/***********************************

input/input_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func getDefaultHandler(t *testing.T) (DefaultHandler, idx.MetricIndex, func()) {
195195
}
196196

197197
mdata.Schemas = conf.NewSchemas(nil)
198-
metrics := mdata.NewAggMetrics(nil, nil, false, 0, 0, 3600, 7200, 3600)
198+
metrics := mdata.NewAggMetrics(nil, nil, false, nil, 3600, 7200, 3600)
199199
return NewDefaultHandler(metrics, index, "test"), index, reset
200200
}
201201

@@ -207,7 +207,7 @@ func BenchmarkProcessMetricDataUniqueMetrics(b *testing.B) {
207207
mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, 0))
208208
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
209209

210-
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 800, 8000, 0)
210+
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, nil, 800, 8000, 0)
211211
metricIndex := memory.New()
212212
metricIndex.Init()
213213
defer metricIndex.Stop()
@@ -247,7 +247,7 @@ func BenchmarkProcessMetricDataSameMetric(b *testing.B) {
247247
mdata.SetSingleSchema(conf.NewRetentionMT(10, 10000, 600, 10, 0))
248248
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
249249

250-
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 800, 8000, 0)
250+
aggmetrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, nil, 800, 8000, 0)
251251
metricIndex := memory.New()
252252
metricIndex.Init()
253253
defer metricIndex.Stop()

mdata/aggmetrics.go

+4-16
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ type AggMetrics struct {
1818
store Store
1919
cachePusher cache.CachePusher
2020
dropFirstChunk bool
21-
ingestAfter struct {
22-
orgID uint32
23-
timestamp int64
24-
}
21+
ingestAfter map[uint32]int64
2522
chunkMaxStale uint32
2623
metricMaxStale uint32
2724
gcInterval time.Duration
@@ -30,18 +27,12 @@ type AggMetrics struct {
3027
Metrics map[uint32]map[schema.Key]*AggMetric
3128
}
3229

33-
func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bool, ingestAfterOrgID uint32, ingestAfterTimestamp int64, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics {
30+
func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bool, ingestAfter map[uint32]int64, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics {
3431
ms := AggMetrics{
3532
store: store,
3633
cachePusher: cachePusher,
3734
dropFirstChunk: dropFirstChunk,
38-
ingestAfter: struct {
39-
orgID uint32
40-
timestamp int64
41-
}{
42-
orgID: ingestAfterOrgID,
43-
timestamp: ingestAfterTimestamp,
44-
},
35+
ingestAfter: ingestAfter,
4536
Metrics: make(map[uint32]map[schema.Key]*AggMetric),
4637
chunkMaxStale: chunkMaxStale,
4738
metricMaxStale: metricMaxStale,
@@ -173,10 +164,7 @@ func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16, inter
173164
ms.Unlock()
174165
return m
175166
}
176-
ingestAfter := int64(0)
177-
if key.Org == ms.ingestAfter.orgID {
178-
ingestAfter = ms.ingestAfter.timestamp
179-
}
167+
ingestAfter := ms.ingestAfter[key.Org]
180168
m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, interval, &agg, ms.dropFirstChunk, ingestAfter)
181169
ms.Metrics[key.Org][key.Key] = m
182170
active := len(ms.Metrics[key.Org])

0 commit comments

Comments
 (0)