diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index e24f860dea..e4431147a9 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -4076,7 +4076,7 @@ "kind": "field", "name": "cost_attribution_label", "required": false, - "desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total", + "desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "validation.cost-attribution-label", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 3520826340..677e1728b6 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3072,7 +3072,7 @@ Usage of ./cmd/mimir/mimir: -usage-stats.installation-mode string Installation mode. Supported values: custom, helm, jsonnet. (default "custom") -validation.cost-attribution-label string - [experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total + [experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. -validation.create-grace-period duration Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m) -validation.enforce-metadata-metric-name diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 8eb8ceb77a..0c40edb6eb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -106,7 +106,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - costAttributionSvr *util.CostAttributionCleanupService + costAttributionSvc *util.CostAttributionCleanupService // For handling HA replicas. HATracker *haTracker @@ -343,7 +343,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove healthyInstancesCount: atomic.NewUint32(0), limits: limits, HATracker: haTracker, - costAttributionSvr: costAttributionClenaupService, + costAttributionSvc: costAttributionClenaupService, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), maxCostAttributionPerUser: maxCostAttributionPerUser, @@ -1423,7 +1423,9 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error { return err } - d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID)) + now := mtime.Now() + + d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID), now) if len(req.Timeseries) == 0 && len(req.Metadata) == 0 { return nil @@ -1654,14 +1656,18 @@ func tokenForMetadata(userID string, metricName string) uint32 { return mimirpb.ShardByMetricName(userID, metricName) } -func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) { +func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string, now time.Time) { var receivedSamples, receivedExemplars, receivedMetadata int - costAttribution := make(map[string]int, d.maxCostAttributionPerUser) + costAttributionSize := 0 + if costAttributionLabel != "" { + costAttributionSize = d.maxCostAttributionPerUser + } + costAttribution := make(map[string]int, costAttributionSize) for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if costAttributionLabel != "" { - attribution := d.costAttributionSvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now()) + attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now) costAttribution[attribution]++ } } diff --git a/pkg/ingester/activeseries/active_labels_test.go b/pkg/ingester/activeseries/active_labels_test.go index a2ade30168..e6f12911cc 100644 --- a/pkg/ingester/activeseries/active_labels_test.go +++ b/pkg/ingester/activeseries/active_labels_test.go @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) { labels.FromStrings("a", "5"), } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) memPostings := index.NewMemPostings() for i, l := range series { diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go index 919dbd86a7..765ac19963 100644 --- a/pkg/ingester/activeseries/active_native_histogram_postings_test.go +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index 9dbe313bd8..b5ea3a7bb5 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index ef065e0bf1..796789ade7 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -48,8 +48,9 @@ type ActiveSeries struct { matchers *Matchers lastMatchersUpdate time.Time - CostAttributionLabel string - costAttributionSvr *util.CostAttributionCleanupService + costAttributionLabel string + costAttributionSvc *util.CostAttributionCleanupService + maxCostAttributionPerUser int // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. @@ -67,7 +68,8 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - costAttributionSvr *util.CostAttributionCleanupService + costAttributionSvc *util.CostAttributionCleanupService + maxCostAttributionPerUser int mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -98,17 +100,19 @@ func NewActiveSeries( timeout time.Duration, userID string, costAttributionLabel string, - costAttributionSvr *util.CostAttributionCleanupService, + costAttributionSvc *util.CostAttributionCleanupService, + maxCostAttributionPerUser int, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, - CostAttributionLabel: costAttributionLabel, - costAttributionSvr: costAttributionSvr, + costAttributionLabel: costAttributionLabel, + costAttributionSvc: costAttributionSvc, + maxCostAttributionPerUser: maxCostAttributionPerUser, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvr) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc, maxCostAttributionPerUser) } return c @@ -125,7 +129,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionSvr) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc, c.maxCostAttributionPerUser) } c.matchers = asm c.lastMatchersUpdate = now @@ -232,7 +236,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot } func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 { - total := make(map[string]uint32) + total := make(map[string]uint32, c.maxCostAttributionPerUser) for s := 0; s < numStripes; s++ { c.stripes[s].mu.RLock() for k, v := range c.stripes[s].costAttributionValues { @@ -429,7 +433,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly if s.costAttributionLabel != "" { - attributionValue := s.costAttributionSvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now()) + attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos)) s.costAttributionValues[attributionValue]++ e.attributionValue = attributionValue } @@ -462,7 +466,8 @@ func (s *seriesStripe) reinitialize( deleted *deletedSeries, userID string, costAttributionLabel string, - costAttributionSvr *util.CostAttributionCleanupService, + costAttributionSvc *util.CostAttributionCleanupService, + maxCostAttributionPerUser int, ) { s.mu.Lock() defer s.mu.Unlock() @@ -473,13 +478,14 @@ func (s *seriesStripe) reinitialize( s.costAttributionValues = map[string]uint32{} s.activeNativeHistograms = 0 s.activeNativeHistogramBuckets = 0 + s.maxCostAttributionPerUser = maxCostAttributionPerUser s.matchers = asm s.userID = userID s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) s.costAttributionLabel = costAttributionLabel - s.costAttributionSvr = costAttributionSvr + s.costAttributionSvc = costAttributionSvc } func (s *seriesStripe) purge(keepUntil time.Time) { diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index 53297e6907..bb0b8b140b 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -29,7 +29,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4") ref5 := storage.SeriesRef(5) // will be used for ls1 again. - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(time.Now()) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() @@ -194,7 +194,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { @@ -221,7 +221,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) testUpdateSeries(t, c) } @@ -438,7 +438,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { func TestActiveSeries_UpdateSeries_Clear(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) testUpdateSeries(t, c) c.Clear() @@ -479,7 +479,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ls1, ls2 := labelsWithHashCollision() ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) c.UpdateSeries(ls1, ref1, time.Now(), -1) c.UpdateSeries(ls2, ref2, time.Now(), -1) @@ -507,7 +507,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) for i := 0; i < len(series); i++ { c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) @@ -553,7 +553,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil) + c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil, 0) exp := len(series) - ttl expMatchingSeries := 0 @@ -585,7 +585,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) currentTime := time.Now() - c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil, 0) c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) c.UpdateSeries(ls2, ref2, currentTime, -1) @@ -621,7 +621,7 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`})) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(currentTime) assert.True(t, valid) @@ -687,7 +687,7 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { })) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -726,7 +726,7 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -779,7 +779,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo var ( // Run the active series tracker with an active timeout = 0 so that the Purge() will always // purge the series. - c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil) + c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil, 0) updateGroup = &sync.WaitGroup{} purgeGroup = &sync.WaitGroup{} start = make(chan struct{}) @@ -917,7 +917,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) for round := 0; round <= tt.nRounds; round++ { for ix := 0; ix < tt.nSeries; ix++ { c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1) @@ -942,7 +942,7 @@ func benchmarkPurge(b *testing.B, twice bool) { const numExpiresSeries = numSeries / 25 currentTime := time.Now() - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) series := [numSeries]labels.Labels{} refs := [numSeries]storage.SeriesRef{} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c122c8f468..6c2db32416 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -313,7 +313,7 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService - costAttribution *util.CostAttributionCleanupService + costAttributionSvc *util.CostAttributionCleanupService tsdbMetrics *tsdbMetrics @@ -391,7 +391,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - i.costAttribution = costAttributionCleanupService + i.costAttributionSvc = costAttributionCleanupService i.maxCostAttributionPerUser = maxCostAttributionPerUser // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -1301,12 +1301,19 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn softErrorFunction), activeSeries *activeseries.ActiveSeries, outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error { // Return true if handled as soft error, and we can ingest more series. + // get the cost attribution value for the series + costLabel := i.limits.CostAttributionLabel(userID) handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool { - - // get the cost attribution value for the series - costLabel := i.limits.CostAttributionLabel(userID) if costLabel != "" { - costAttrib := i.costAttribution.UpdateAttributionTimestamp(userID, validation.AttributionValue(costLabel, userID, labels), time.Now()) + costAttrib := "" + for _, label := range labels { + if label.Name == costLabel { + costAttrib = label.Value + } + } + // get the label value and update the timestamp, + // if the cordianlity is reached or we are currently in cooldown period, function would returned __unaccounted__ + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend) stats.failedSamplesAttribution[costAttrib]++ } @@ -1414,12 +1421,15 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var builder labels.ScratchBuilder var nonCopiedLabels labels.Labels for _, ts := range timeseries { - // The cost attribution value for the series - costLabel := i.limits.CostAttributionLabel(userID) var costAttrib string // when cost attribution label is set if costLabel != "" { - costAttrib = i.costAttribution.UpdateAttributionTimestamp(userID, validation.AttributionValue(costLabel, userID, ts.Labels), time.Now()) + for _, label := range ts.Labels { + if label.Name == costLabel { + costAttrib = label.Value + } + } + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend) } // The labels must be sorted (in our case, it's guaranteed a write request @@ -2677,7 +2687,8 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD i.cfg.ActiveSeriesMetrics.IdleTimeout, userID, i.limits.CostAttributionLabel(userID), - i.costAttribution, + i.costAttributionSvc, + i.maxCostAttributionPerUser, ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 49f9ba6eb4..ebf41b19bf 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -284,7 +284,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.OutOfOrderBlocksExternalLabelEnabled, "ingester.out-of-order-blocks-external-label-enabled", false, "Whether the shipper should label out-of-order blocks with an external label before uploading them. Setting this label will compact out-of-order blocks separately from non-out-of-order blocks") f.StringVar(&l.SeparateMetricsGroupLabel, "validation.separate-metrics-group-label", "", "Label used to define the group label for metrics separation. For each write request, the group is obtained from the first non-empty group label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'group' label with group label's value. Currently applies to the following metrics: cortex_discarded_samples_total") - f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total") + f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.") f.IntVar(&l.MaxChunksPerQuery, MaxChunksPerQueryFlag, 2e6, "Maximum number of chunks that can be fetched in a single query from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.Float64Var(&l.MaxEstimatedChunksPerQueryMultiplier, MaxEstimatedChunksPerQueryMultiplierFlag, 0, "Maximum number of chunks estimated to be fetched in a single query from ingesters and store-gateways, as a multiple of -"+MaxChunksPerQueryFlag+". This limit is enforced in the querier. Must be greater than or equal to 1, or 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, MaxSeriesPerQueryFlag, 0, "The maximum number of unique series for which a query can fetch samples from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") diff --git a/pkg/util/validation/separate_metrics.go b/pkg/util/validation/separate_metrics.go index 0f2384984b..cc6194a42e 100644 --- a/pkg/util/validation/separate_metrics.go +++ b/pkg/util/validation/separate_metrics.go @@ -30,17 +30,3 @@ func GroupLabel(o *Overrides, userID string, timeseries []mimirpb.PreallocTimese return "" } - -// AttributionLabel obtains the value of cost attribution label for tenant -func AttributionValue(attribLab string, userID string, lbs []mimirpb.LabelAdapter) string { - if len(lbs) == 0 { - return "" - } - - for _, label := range lbs { - if label.Name == attribLab { - return label.Value - } - } - return "" -}