From 3c422a8f5771fc9cbf01949c2602c983b25947aa Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Tue, 24 Sep 2024 00:33:07 +0200 Subject: [PATCH] new service for tracking cost attribution --- cmd/mimir/config-descriptor.json | 33 ++-- cmd/mimir/help-all.txt.tmpl | 6 +- pkg/distributor/distributor.go | 18 +- pkg/distributor/distributor_test.go | 4 +- pkg/distributor/validate.go | 1 - .../activeseries/active_labels_test.go | 2 +- .../active_native_histogram_postings_test.go | 10 +- .../activeseries/active_postings_test.go | 6 +- pkg/ingester/activeseries/active_series.go | 24 ++- .../activeseries/active_series_test.go | 28 +-- pkg/ingester/ingester.go | 37 +++- pkg/ingester/ingester_ingest_storage_test.go | 2 +- pkg/ingester/ingester_test.go | 6 +- pkg/ingester/metrics.go | 12 ++ pkg/mimir/mimir.go | 5 + pkg/mimir/modules.go | 25 ++- .../benchmarks/comparison_test.go | 2 +- pkg/streamingpromql/benchmarks/ingester.go | 2 +- pkg/util/cost_attribution.go | 185 ++++++++++++++++++ pkg/util/validation/limits.go | 4 - pkg/util/validation/separate_metrics.go | 9 +- 21 files changed, 340 insertions(+), 81 deletions(-) create mode 100644 pkg/util/cost_attribution.go diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index fe06d305ddb..e24f860dea2 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -57,6 +57,28 @@ "fieldType": "int", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "max_cost_attribution_per_user", + "required": false, + "desc": "Maximum number of cost attributions allowed per user.", + "fieldValue": null, + "fieldDefaultValue": 200, + "fieldFlag": "max-cost-attribution-per-user", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "cost_attribution_eviction_interval", + "required": false, + "desc": "Interval at which to evict inactive cost attributions.", + "fieldValue": null, + "fieldDefaultValue": 600000000000, + "fieldFlag": "cost-attribution-eviction-interval", + "fieldType": "duration", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "enable_go_runtime_metrics", @@ -4061,17 +4083,6 @@ "fieldType": "string", "fieldCategory": "experimental" }, - { - "kind": "field", - "name": "max_cost_attribution_per_user", - "required": false, - "desc": "Maximum number of cost attributions per user. 0 to disable the limit.", - "fieldValue": null, - "fieldDefaultValue": 200, - "fieldFlag": "validation.max-cost-attribution-per-user", - "fieldType": "int", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "max_fetched_chunks_per_query", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9ef27984f7f..35208263407 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1133,6 +1133,8 @@ Usage of ./cmd/mimir/mimir: Expands ${var} or $var in config according to the values of the environment variables. -config.file value Configuration file to load. + -cost-attribution-eviction-interval duration + [experimental] Interval at which to evict inactive cost attributions. (default 10m0s) -debug.block-profile-rate int Fraction of goroutine blocking events that are reported in the blocking profile. 1 to include every blocking event in the profile, 0 to disable. -debug.mutex-profile-fraction int @@ -1681,6 +1683,8 @@ Usage of ./cmd/mimir/mimir: [experimental] Burst size, i.e., maximum number of messages that can be logged at once, temporarily exceeding the configured maximum logs per second. (default 1000) -log.rate-limit-logs-per-second float [experimental] Maximum number of messages per second to be logged. (default 10000) + -max-cost-attribution-per-user int + [experimental] Maximum number of cost attributions allowed per user. (default 200) -max-separate-metrics-groups-per-user int [experimental] Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated. (default 1000) -mem-ballast-size-bytes int @@ -3073,8 +3077,6 @@ Usage of ./cmd/mimir/mimir: Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m) -validation.enforce-metadata-metric-name Enforce every metadata has a metric name. (default true) - -validation.max-cost-attribution-per-user int - [experimental] Maximum number of cost attributions per user. 0 to disable the limit. (default 200) -validation.max-label-names-per-series int Maximum number of label names per series. (default 30) -validation.max-length-label-name int diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index bdec3565912..2c51bce16a1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -105,7 +105,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - + costAttributionsvr *util.CostAttributionCleanupService // For handling HA replicas. HATracker *haTracker @@ -307,7 +307,10 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, + activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, + ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, + canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -342,6 +345,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove healthyInstancesCount: atomic.NewUint32(0), limits: limits, HATracker: haTracker, + costAttributionsvr: costAttributionClenaupService, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -357,6 +361,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "cortex_distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user", "attrib"}), + receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_distributor_received_exemplars_total", Help: "The total number of received exemplars, excluding rejected and deduped exemplars.", @@ -640,7 +645,6 @@ func (d *Distributor) cleanupInactiveUser(userID string) { d.HATracker.cleanupHATrackerMetricsForUser(userID) d.receivedRequests.DeleteLabelValues(userID) - d.receivedSamples.DeletePartialMatch(prometheus.Labels{"user": userID}) d.receivedExemplars.DeleteLabelValues(userID) d.receivedMetadata.DeleteLabelValues(userID) d.incomingRequests.DeleteLabelValues(userID) @@ -656,6 +660,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) { filter := prometheus.Labels{"user": userID} d.dedupedSamples.DeletePartialMatch(filter) + d.receivedSamples.DeletePartialMatch(filter) d.discardedSamplesTooManyHaClusters.DeletePartialMatch(filter) d.discardedSamplesRateLimited.DeletePartialMatch(filter) d.discardedRequestsRateLimited.DeleteLabelValues(userID) @@ -674,6 +679,11 @@ func (d *Distributor) RemoveGroupMetricsForUser(userID, group string) { d.sampleValidationMetrics.deleteUserMetricsForGroup(userID, group) } +func (d *Distributor) RemoveAttributionMetricsForUser(userID, attribution string) { + d.receivedSamples.DeleteLabelValues(userID, attribution) + //TODO @ying: Remove attribution metrics +} + // Called after distributor is asked to stop via StopAsync. func (d *Distributor) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), d.subservices) @@ -1652,7 +1662,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if costAttributionLabel != "" { - attribution := mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel) + attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now()) costAttribution[attribution]++ } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3d2013dd4c1..ec0a129fa63 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2026,7 +2026,7 @@ func BenchmarkDistributor_Push(b *testing.B) { require.NoError(b, err) // Start the distributor. - distributor, err := New(distributorCfg, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) + distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) require.NoError(b, err) require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor)) @@ -5314,7 +5314,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - d, err := New(distributorCfg, clientConfig, overrides, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger()) + d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, d)) t.Cleanup(func() { diff --git a/pkg/distributor/validate.go b/pkg/distributor/validate.go index 2e3dc472f3c..db15559c70d 100644 --- a/pkg/distributor/validate.go +++ b/pkg/distributor/validate.go @@ -370,7 +370,6 @@ func removeNonASCIIChars(in string) (out string) { // The returned error may retain the provided series labels. func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, skipLabelNameValidation bool) error { unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls) - if err != nil { m.missingMetricName.WithLabelValues(userID, group).Inc() return errors.New(noMetricNameMsgFormat) diff --git a/pkg/ingester/activeseries/active_labels_test.go b/pkg/ingester/activeseries/active_labels_test.go index 82489dbe45a..a2ade301687 100644 --- a/pkg/ingester/activeseries/active_labels_test.go +++ b/pkg/ingester/activeseries/active_labels_test.go @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) { labels.FromStrings("a", "5"), } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) memPostings := index.NewMemPostings() for i, l := range series { diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go index 0f631dfa422..919dbd86a7f 100644 --- a/pkg/ingester/activeseries/active_native_histogram_postings_test.go +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index e3ec24ad0e2..9dbe313bd81 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 1b6c099b6e4..4882f02e69c 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/grafana/mimir/pkg/util" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" @@ -48,6 +49,7 @@ type ActiveSeries struct { lastMatchersUpdate time.Time CostAttributionLabel string + costAttributionsvr *util.CostAttributionCleanupService // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. @@ -64,8 +66,8 @@ type seriesStripe struct { // Unix nanoseconds. Only used by purge. Zero = unknown. // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). - oldestEntryTs atomic.Int64 - + oldestEntryTs atomic.Int64 + costAttributionsvr *util.CostAttributionCleanupService mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -96,15 +98,17 @@ func NewActiveSeries( timeout time.Duration, userID string, costAttributionLabel string, + costAttributionsvr *util.CostAttributionCleanupService, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, CostAttributionLabel: costAttributionLabel, + costAttributionsvr: costAttributionsvr, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionsvr) } return c @@ -121,7 +125,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionsvr) } c.matchers = asm c.lastMatchersUpdate = now @@ -230,6 +234,8 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 { total := map[string]uint32{} for s := 0; s < numStripes; s++ { + c.stripes[s].mu.RLock() + defer c.stripes[s].mu.RUnlock() for k, v := range c.stripes[s].costAttributionValues { total[k] += v } @@ -423,11 +429,9 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly if s.costAttributionLabel != "" { - attributionValue := series.Get(s.costAttributionLabel) - if attributionValue != "" { - s.costAttributionValues[attributionValue]++ - e.attributionValue = attributionValue - } + attributionValue := s.costAttributionsvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now()) + s.costAttributionValues[attributionValue]++ + e.attributionValue = attributionValue } s.refs[ref] = e @@ -458,6 +462,7 @@ func (s *seriesStripe) reinitialize( deleted *deletedSeries, userID string, costAttributionLabel string, + costAttributionsvr *util.CostAttributionCleanupService, ) { s.mu.Lock() defer s.mu.Unlock() @@ -474,6 +479,7 @@ func (s *seriesStripe) reinitialize( s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) s.costAttributionLabel = costAttributionLabel + s.costAttributionsvr = costAttributionsvr } func (s *seriesStripe) purge(keepUntil time.Time) { diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index 0d926ea6d68..53297e69079 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -29,7 +29,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4") ref5 := storage.SeriesRef(5) // will be used for ls1 again. - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) valid := c.Purge(time.Now()) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() @@ -194,7 +194,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { @@ -221,7 +221,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) testUpdateSeries(t, c) } @@ -438,7 +438,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { func TestActiveSeries_UpdateSeries_Clear(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) testUpdateSeries(t, c) c.Clear() @@ -479,7 +479,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ls1, ls2 := labelsWithHashCollision() ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) c.UpdateSeries(ls1, ref1, time.Now(), -1) c.UpdateSeries(ls2, ref2, time.Now(), -1) @@ -507,7 +507,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) for i := 0; i < len(series); i++ { c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) @@ -553,7 +553,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(asm, 5*time.Minute, "foo", "") + c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil) exp := len(series) - ttl expMatchingSeries := 0 @@ -585,7 +585,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) currentTime := time.Now() - c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "") + c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil) c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) c.UpdateSeries(ls2, ref2, currentTime, -1) @@ -621,7 +621,7 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`})) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) valid := c.Purge(currentTime) assert.True(t, valid) @@ -687,7 +687,7 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { })) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -726,7 +726,7 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -779,7 +779,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo var ( // Run the active series tracker with an active timeout = 0 so that the Purge() will always // purge the series. - c = NewActiveSeries(&Matchers{}, 0, "foo", "") + c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil) updateGroup = &sync.WaitGroup{} purgeGroup = &sync.WaitGroup{} start = make(chan struct{}) @@ -917,7 +917,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) for round := 0; round <= tt.nRounds; round++ { for ix := 0; ix < tt.nSeries; ix++ { c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1) @@ -942,7 +942,7 @@ func benchmarkPurge(b *testing.B, twice bool) { const numExpiresSeries = numSeries / 25 currentTime := time.Now() - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) series := [numSeries]labels.Labels{} refs := [numSeries]storage.SeriesRef{} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 646f9a4b517..e780c449161 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -312,6 +312,8 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService + costAttribution *util.CostAttributionCleanupService + tsdbMetrics *tsdbMetrics forceCompactTrigger chan requestWithUsersAndCallback @@ -380,7 +382,12 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus } // New returns an Ingester that uses Mimir block storage. -func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New( + cfg Config, limits *validation.Overrides, + ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, + activeGroupsCleanupService *util.ActiveGroupsCleanupService, + costAttributionCleanupService *util.CostAttributionCleanupService, + registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -388,7 +395,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - + i.costAttribution = costAttributionCleanupService // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -1283,6 +1290,11 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats db.ingestedAPISamples.Add(int64(stats.succeededSamplesCount)) } } + if stats.failedSamplesAttribution != nil && len(stats.failedSamplesAttribution) > 0 { + for label, count := range stats.failedSamplesAttribution { + discarded.samplesPerAttribution.WithLabelValues(userID, label).Add(float64(count)) + } + } } // pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function, @@ -1295,8 +1307,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool { // get the cost attribution value for the series - costAttrib := validation.AttributionValue(i.limits, userID, labels) - if costAttrib != "" { + costLabel := i.limits.CostAttributionLabel(userID) + if costLabel != "" { + costAttrib := i.costAttribution.UpdateAttributionTimestamp(userID, validation.AttributionValue(costLabel, userID, labels), time.Now()) stats.failedSamplesAttribution[costAttrib]++ } @@ -1405,7 +1418,12 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var nonCopiedLabels labels.Labels for _, ts := range timeseries { // The cost attribution value for the series - costAttrib := validation.AttributionValue(i.limits, userID, ts.Labels) + costLabel := i.limits.CostAttributionLabel(userID) + var costAttrib string + // when cost attribution label is set + if costLabel != "" { + costAttrib = i.costAttribution.UpdateAttributionTimestamp(userID, validation.AttributionValue(costLabel, userID, ts.Labels), time.Now()) + } // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -1422,7 +1440,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) + len(ts.Histograms) stats.sampleOutOfBoundsCount += len(ts.Samples) + len(ts.Histograms) - if costAttrib != "" { + if costLabel != "" { stats.failedSamplesAttribution[costAttrib] += len(ts.Samples) + len(ts.Histograms) } var firstTimestamp int64 @@ -1445,7 +1463,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) stats.sampleOutOfBoundsCount += len(ts.Samples) - if costAttrib != "" { + if costLabel != "" { stats.failedSamplesAttribution[costAttrib] += len(ts.Samples) } firstTimestamp := ts.Samples[0].TimestampMs @@ -2662,6 +2680,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD i.cfg.ActiveSeriesMetrics.IdleTimeout, userID, i.limits.CostAttributionLabel(userID), + i.costAttribution, ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), @@ -3421,6 +3440,10 @@ func (i *Ingester) RemoveGroupMetricsForUser(userID, group string) { i.metrics.deletePerGroupMetricsForUser(userID, group) } +func (i *Ingester) RemoveAttributionMetricsForUser(userID, attribution string) { + i.metrics.deletePerAttributionMetricsForUser(userID, attribution) +} + // TransferOut implements ring.FlushTransferer. func (i *Ingester) TransferOut(_ context.Context) error { return ring.ErrTransferDisabled diff --git a/pkg/ingester/ingester_ingest_storage_test.go b/pkg/ingester/ingester_ingest_storage_test.go index 66a76c52ba0..0b566b03c0b 100644 --- a/pkg/ingester/ingester_ingest_storage_test.go +++ b/pkg/ingester/ingester_ingest_storage_test.go @@ -650,7 +650,7 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over require.NoError(t, services.StopAndAwaitTerminated(ctx, prw)) }) - ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, reg, util_test.NewTestingLogger(t)) + ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, nil, reg, util_test.NewTestingLogger(t)) require.NoError(t, err) return ingester, kafkaCluster, prw diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index cff3fadf29d..9aab9119adf 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -6008,7 +6008,7 @@ func prepareIngesterWithBlockStorageAndOverridesAndPartitionRing(t testing.TB, i ingestersRing = createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()) } - ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, registerer, noDebugNoopLogger{}) // LOGGING: log.NewLogfmtLogger(os.Stderr) + ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, nil, registerer, noDebugNoopLogger{}) // LOGGING: log.NewLogfmtLogger(os.Stderr) if err != nil { return nil, err } @@ -6214,7 +6214,7 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { // setup the tsdbs dir testData.setup(t, tempDir) - ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, log.NewNopLogger()) + ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger()) require.NoError(t, err) startErr := services.StartAndAwaitRunning(context.Background(), ingester) @@ -7374,7 +7374,7 @@ func TestHeadCompactionOnStartup(t *testing.T) { ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost" ingesterCfg.BlocksStorageConfig.TSDB.Retention = 2 * 24 * time.Hour // Make sure that no newly created blocks are deleted. - ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, log.NewNopLogger()) + ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 984e3aa70c3..54adc381c70 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -407,9 +407,15 @@ func (m *ingesterMetrics) deletePerGroupMetricsForUser(userID, group string) { m.discarded.DeleteLabelValues(userID, group) } +func (m *ingesterMetrics) deletePerAttributionMetricsForUser(userID, attribution string) { + m.activeSeriesPerUser.DeleteLabelValues(userID, attribution) + m.discarded.samplesPerAttribution.DeleteLabelValues(userID, attribution) +} + func (m *ingesterMetrics) deletePerUserCustomTrackerMetrics(userID string, customTrackerMetrics []string) { m.activeSeriesLoading.DeleteLabelValues(userID) m.activeSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) + m.activeSeriesPerUserNativeHistograms.DeleteLabelValues(userID) m.activeNativeHistogramBucketsPerUser.DeleteLabelValues(userID) for _, name := range customTrackerMetrics { @@ -428,6 +434,7 @@ type discardedMetrics struct { perUserSeriesLimit *prometheus.CounterVec perMetricSeriesLimit *prometheus.CounterVec invalidNativeHistogram *prometheus.CounterVec + samplesPerAttribution *prometheus.CounterVec } func newDiscardedMetrics(r prometheus.Registerer) *discardedMetrics { @@ -440,6 +447,10 @@ func newDiscardedMetrics(r prometheus.Registerer) *discardedMetrics { perUserSeriesLimit: validation.DiscardedSamplesCounter(r, reasonPerUserSeriesLimit), perMetricSeriesLimit: validation.DiscardedSamplesCounter(r, reasonPerMetricSeriesLimit), invalidNativeHistogram: validation.DiscardedSamplesCounter(r, reasonInvalidNativeHistogram), + samplesPerAttribution: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_discarded_samples_attribution_total", + Help: "The total number of samples that were discarded per attribution.", + }, []string{"user", "attrib"}), } } @@ -452,6 +463,7 @@ func (m *discardedMetrics) DeletePartialMatch(filter prometheus.Labels) { m.perUserSeriesLimit.DeletePartialMatch(filter) m.perMetricSeriesLimit.DeletePartialMatch(filter) m.invalidNativeHistogram.DeletePartialMatch(filter) + m.samplesPerAttribution.DeletePartialMatch(filter) } func (m *discardedMetrics) DeleteLabelValues(userID string, group string) { diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index b41ac7179d8..20d01381959 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -108,6 +108,8 @@ type Config struct { NoAuthTenant string `yaml:"no_auth_tenant" category:"advanced"` ShutdownDelay time.Duration `yaml:"shutdown_delay" category:"advanced"` MaxSeparateMetricsGroupsPerUser int `yaml:"max_separate_metrics_groups_per_user" category:"experimental"` + MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" category:"experimental"` + CostAttributionEvictionInterval time.Duration `yaml:"cost_attribution_eviction_interval" category:"experimental"` EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics" category:"advanced"` PrintConfig bool `yaml:"-"` ApplicationName string `yaml:"-"` @@ -166,7 +168,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.StringVar(&c.NoAuthTenant, "auth.no-auth-tenant", "anonymous", "Tenant ID to use when multitenancy is disabled.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.") + f.DurationVar(&c.CostAttributionEvictionInterval, "cost-attribution-eviction-interval", 10*time.Minute, "Interval at which to evict inactive cost attributions.") f.IntVar(&c.MaxSeparateMetricsGroupsPerUser, "max-separate-metrics-groups-per-user", 1000, "Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated.") + f.IntVar(&c.MaxCostAttributionPerUser, "max-cost-attribution-per-user", 200, "Maximum number of cost attributions allowed per user.") f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics, such as go_sched_* and go_memstats_*.") f.BoolVar(&c.TimeseriesUnmarshalCachingOptimizationEnabled, "timeseries-unmarshal-caching-optimization-enabled", true, "Enables optimized marshaling of timeseries.") @@ -707,6 +711,7 @@ type Mimir struct { TenantLimits validation.TenantLimits Overrides *validation.Overrides ActiveGroupsCleanup *util.ActiveGroupsCleanupService + CostAttributionCleanup *util.CostAttributionCleanupService Distributor *distributor.Distributor Ingester *ingester.Ingester diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 052e4338f6b..cdd568669ed 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -78,7 +78,7 @@ const ( OverridesExporter string = "overrides-exporter" Server string = "server" ActiveGroupsCleanupService string = "active-groups-cleanup-service" - CostAttributionCleanupService string = "cost-attribution-cleanup-service" + CostAttributionService string = "cost-attribution-service" Distributor string = "distributor" DistributorService string = "distributor-service" Ingester string = "ingester" @@ -465,7 +465,9 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.PreferAvailabilityZone = t.Cfg.Querier.PreferAvailabilityZone t.Cfg.Distributor.IngestStorageConfig = t.Cfg.IngestStorage - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.ActiveGroupsCleanup, t.IngesterRing, t.IngesterPartitionInstanceRing, canJoinDistributorsRing, t.Registerer, util_log.Logger) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, + t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.IngesterRing, t.IngesterPartitionInstanceRing, + canJoinDistributorsRing, t.Registerer, util_log.Logger) if err != nil { return } @@ -474,6 +476,10 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.ActiveGroupsCleanup.Register(t.Distributor) } + if t.CostAttributionCleanup != nil { + t.CostAttributionCleanup.Register(t.Distributor) + } + return t.Distributor, nil } @@ -647,6 +653,11 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { return t.ActiveGroupsCleanup, nil } +func (t *Mimir) initCostAttributionService() (services.Service, error) { + t.CostAttributionCleanup = util.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, t.Cfg.MaxCostAttributionPerUser, util_log.Logger) + return t.CostAttributionCleanup, nil +} + func (t *Mimir) tsdbIngesterConfig() { t.Cfg.Ingester.BlocksStorageConfig = t.Cfg.BlocksStorage } @@ -658,7 +669,7 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.IngestStorageConfig = t.Cfg.IngestStorage t.tsdbIngesterConfig() - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.Registerer, util_log.Logger) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.Registerer, util_log.Logger) if err != nil { return } @@ -667,6 +678,9 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.ActiveGroupsCleanup.Register(t.Ingester) } + if t.CostAttributionCleanup != nil { + t.CostAttributionCleanup.Register(t.Ingester) + } return t.Ingester, nil } @@ -1120,6 +1134,7 @@ func (t *Mimir) setupModuleManager() error { mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(ActiveGroupsCleanupService, t.initActiveGroupsCleanupService, modules.UserInvisibleModule) + mm.RegisterModule(CostAttributionService, t.initCostAttributionService, modules.UserInvisibleModule) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule) mm.RegisterModule(Ingester, t.initIngester) @@ -1157,9 +1172,9 @@ func (t *Mimir) setupModuleManager() error { IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, MemberlistKV, Vault}, - Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, + Distributor: {DistributorService, API, ActiveGroupsCleanupService, CostAttributionService, Vault}, DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, - Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, + Ingester: {IngesterService, API, ActiveGroupsCleanupService, CostAttributionService, Vault}, IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, Flusher: {Overrides, API}, Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 2678f0d5c46..1ec18d108d3 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -237,7 +237,7 @@ func createIngesterQueryable(t testing.TB, address string) storage.Queryable { overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, ingestersRing, nil, false, nil, logger) + d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, nil, ingestersRing, nil, false, nil, logger) require.NoError(t, err) queryMetrics := stats.NewQueryMetrics(nil) diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 6f3b5f04a9a..9107b66f64f 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -96,7 +96,7 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun return services.StopAndAwaitTerminated(context.Background(), ingestersRing) }) - ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, log.NewNopLogger()) + ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, nil, log.NewNopLogger()) if err != nil { cleanup() return nil, "", nil, fmt.Errorf("could not create ingester: %w", err) diff --git a/pkg/util/cost_attribution.go b/pkg/util/cost_attribution.go new file mode 100644 index 00000000000..04d2a2ec7b6 --- /dev/null +++ b/pkg/util/cost_attribution.go @@ -0,0 +1,185 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package util + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "go.uber.org/atomic" +) + +type CostAttribution struct { + mu sync.RWMutex + timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp + coolDownDeadline map[string]*atomic.Int64 + maxAttributionPerUser int +} + +func NewCostAttribution(maxAttributionPerUser int) *CostAttribution { + return &CostAttribution{ + timestampsPerUser: map[string]map[string]*atomic.Int64{}, + coolDownDeadline: map[string]*atomic.Int64{}, + maxAttributionPerUser: maxAttributionPerUser, + } +} + +// UpdateAttributionTimestampForUser function is only guaranteed to update to the +// timestamp provided even if it is smaller than the existing value +func (ag *CostAttribution) UpdateAttributionTimestampForUser(userID, attribution string, now time.Time) { + ts := now.UnixNano() + ag.mu.RLock() + if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil { + ag.mu.RUnlock() + groupTs.Store(ts) + return + } + ag.mu.RUnlock() + + ag.mu.Lock() + defer ag.mu.Unlock() + + if ag.timestampsPerUser[userID] == nil { + ag.timestampsPerUser[userID] = map[string]*atomic.Int64{attribution: atomic.NewInt64(ts)} + return + } + + if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil { + groupTs.Store(ts) + return + } + + ag.timestampsPerUser[userID][attribution] = atomic.NewInt64(ts) +} + +func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadline int64) []string { + ag.mu.RLock() + var inactiveAttributions []string + attributionTimestamps := ag.timestampsPerUser[userID] + + for attr, ts := range attributionTimestamps { + if ts.Load() <= deadline { + inactiveAttributions = append(inactiveAttributions, attr) + } + } + ag.mu.RUnlock() + + if len(inactiveAttributions) == 0 { + return nil + } + + // Cleanup inactive groups + ag.mu.Lock() + defer ag.mu.Unlock() + + for i := 0; i < len(inactiveAttributions); { + inactiveAttribution := inactiveAttributions[i] + groupTs := ag.timestampsPerUser[userID][inactiveAttribution] + if groupTs != nil && groupTs.Load() <= deadline { + delete(ag.timestampsPerUser[userID], inactiveAttribution) + i++ + } else { + inactiveAttributions[i] = inactiveAttributions[len(inactiveAttributions)-1] + inactiveAttributions = inactiveAttributions[:len(inactiveAttributions)-1] + } + } + + return inactiveAttributions +} + +func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string)) { + ca.mu.RLock() + userIDs := make([]string, 0, len(ca.timestampsPerUser)) + for userID := range ca.timestampsPerUser { + userIDs = append(userIDs, userID) + } + ca.mu.RUnlock() + + currentTime := time.Now() + for _, userID := range userIDs { + inactiveAttributions := ca.purgeInactiveAttributionsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano()) + for _, attribution := range inactiveAttributions { + for _, cleanupFn := range cleanupFuncs { + cleanupFn(userID, attribution) + } + } + } +} + +func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time) bool { + // if we are still at the cooldown period, we will consider the limit reached + ca.mu.RLock() + defer ca.mu.RUnlock() + + if v, exists := ca.coolDownDeadline[userID]; exists && v.Load() > now.UnixNano() { + return true + } + + // if the user attribution is already exist and we are not in the cooldown period, we don't need to check the limit + _, exists := ca.timestampsPerUser[userID][attribution] + if exists { + return false + } + + // if the user has reached the limit, we will set the cooldown period which is 20 minutes + maxReached := len(ca.timestampsPerUser[userID]) >= ca.maxAttributionPerUser + if maxReached { + ca.coolDownDeadline[userID].Store(time.Now().Add(20 * time.Minute).UnixNano()) + return true + } + + return maxReached +} + +type CostAttributionCleanupService struct { + services.Service + logger log.Logger + costAttribution *CostAttribution + cleanupFuncs []func(userID, attribution string) + inactiveTimeout time.Duration +} + +type CostAttributionMetricsCleaner interface { + RemoveAttributionMetricsForUser(userID, attribution string) +} + +func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, maxAttributionPerUser int, logger log.Logger, cleanupFns ...func(string, string)) *CostAttributionCleanupService { + s := &CostAttributionCleanupService{ + costAttribution: NewCostAttribution(maxAttributionPerUser), + cleanupFuncs: cleanupFns, + inactiveTimeout: inactiveTimeout, + logger: logger, + } + + s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution cleanup") + return s +} + +func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time) string { + // empty label is not normal, if user set attribution label, the metrics send has to include the label + if attribution == "" { + attribution = "other" + level.Error(s.logger).Log("msg", "set attribution label to \"other\" since missing cost attribution label in metrics") + } else if s.costAttribution.attributionLimitExceeded(user, attribution, now) { + attribution = "other" + level.Error(s.logger).Log("msg", "set attribution label to \"other\" since user has reached the limit of cost attribution labels") + } + + s.costAttribution.UpdateAttributionTimestampForUser(user, attribution, now) + return attribution +} + +func (s *CostAttributionCleanupService) iteration(_ context.Context) error { + s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout, s.cleanupFuncs...) + return nil +} + +// Register registers the cleanup function from metricsCleaner to be called during each cleanup iteration. +// This function is NOT thread safe +func (s *CostAttributionCleanupService) Register(metricsCleaner CostAttributionMetricsCleaner) { + s.cleanupFuncs = append(s.cleanupFuncs, metricsCleaner.RemoveAttributionMetricsForUser) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3788db090dd..49f9ba6eb4b 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -146,9 +146,6 @@ type Limits struct { // User defined label to give the cost distribution by values of the label CostAttributionLabel string `yaml:"cost_attribution_label" json:"cost_attribution_label" category:"experimental"` - // The limit of the number of attributions per user - MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" json:"max_cost_attribution_per_user" category:"experimental"` - // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` MaxEstimatedChunksPerQueryMultiplier float64 `yaml:"max_estimated_fetched_chunks_per_query_multiplier" json:"max_estimated_fetched_chunks_per_query_multiplier" category:"experimental"` @@ -288,7 +285,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.StringVar(&l.SeparateMetricsGroupLabel, "validation.separate-metrics-group-label", "", "Label used to define the group label for metrics separation. For each write request, the group is obtained from the first non-empty group label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'group' label with group label's value. Currently applies to the following metrics: cortex_discarded_samples_total") f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total") - f.IntVar(&l.MaxCostAttributionPerUser, "validation.max-cost-attribution-per-user", 200, "Maximum number of cost attributions per user. 0 to disable the limit.") f.IntVar(&l.MaxChunksPerQuery, MaxChunksPerQueryFlag, 2e6, "Maximum number of chunks that can be fetched in a single query from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.Float64Var(&l.MaxEstimatedChunksPerQueryMultiplier, MaxEstimatedChunksPerQueryMultiplierFlag, 0, "Maximum number of chunks estimated to be fetched in a single query from ingesters and store-gateways, as a multiple of -"+MaxChunksPerQueryFlag+". This limit is enforced in the querier. Must be greater than or equal to 1, or 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, MaxSeriesPerQueryFlag, 0, "The maximum number of unique series for which a query can fetch samples from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") diff --git a/pkg/util/validation/separate_metrics.go b/pkg/util/validation/separate_metrics.go index 48cefb65051..0f2384984b0 100644 --- a/pkg/util/validation/separate_metrics.go +++ b/pkg/util/validation/separate_metrics.go @@ -32,18 +32,13 @@ func GroupLabel(o *Overrides, userID string, timeseries []mimirpb.PreallocTimese } // AttributionLabel obtains the value of cost attribution label for tenant -func AttributionValue(o *Overrides, userID string, lbs []mimirpb.LabelAdapter) string { +func AttributionValue(attribLab string, userID string, lbs []mimirpb.LabelAdapter) string { if len(lbs) == 0 { return "" } - attributionLabel := o.CostAttributionLabel(userID) - if attributionLabel == "" { - // If not set, no cost attribution is required - return attributionLabel - } for _, label := range lbs { - if label.Name == attributionLabel { + if label.Name == attribLab { return label.Value } }