From 7e628c3508fec14ca9a2867dca2e8dedcb54526f Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Wed, 25 Sep 2024 14:43:24 +0200 Subject: [PATCH] address comments --- cmd/mimir/config-descriptor.json | 24 +++++----- cmd/mimir/help-all.txt.tmpl | 6 +-- pkg/distributor/distributor.go | 23 ++++++---- pkg/distributor/distributor_test.go | 2 +- .../activeseries/active_labels_test.go | 2 +- .../active_native_histogram_postings_test.go | 10 ++-- .../activeseries/active_postings_test.go | 6 +-- pkg/ingester/activeseries/active_series.go | 32 +++++++------ .../activeseries/active_series_test.go | 28 +++++------ pkg/ingester/ingester.go | 46 ++++++++++--------- pkg/mimir/mimir.go | 2 - pkg/mimir/modules.go | 2 +- pkg/util/cost_attribution.go | 37 +++++++-------- pkg/util/validation/limits.go | 10 +++- pkg/util/validation/separate_metrics.go | 14 ------ 15 files changed, 124 insertions(+), 120 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index e24f860dea2..2f6d204c850 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -57,17 +57,6 @@ "fieldType": "int", "fieldCategory": "experimental" }, - { - "kind": "field", - "name": "max_cost_attribution_per_user", - "required": false, - "desc": "Maximum number of cost attributions allowed per user.", - "fieldValue": null, - "fieldDefaultValue": 200, - "fieldFlag": "max-cost-attribution-per-user", - "fieldType": "int", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "cost_attribution_eviction_interval", @@ -4076,13 +4065,24 @@ "kind": "field", "name": "cost_attribution_label", "required": false, - "desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total", + "desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "validation.cost-attribution-label", "fieldType": "string", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "max_cost_attribution_per_user", + "required": false, + "desc": "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "validation.max-cost-attribution-per-user", + "fieldType": "int", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "max_fetched_chunks_per_query", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 35208263407..6e53767bef9 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1683,8 +1683,6 @@ Usage of ./cmd/mimir/mimir: [experimental] Burst size, i.e., maximum number of messages that can be logged at once, temporarily exceeding the configured maximum logs per second. (default 1000) -log.rate-limit-logs-per-second float [experimental] Maximum number of messages per second to be logged. (default 10000) - -max-cost-attribution-per-user int - [experimental] Maximum number of cost attributions allowed per user. (default 200) -max-separate-metrics-groups-per-user int [experimental] Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated. (default 1000) -mem-ballast-size-bytes int @@ -3072,11 +3070,13 @@ Usage of ./cmd/mimir/mimir: -usage-stats.installation-mode string Installation mode. Supported values: custom, helm, jsonnet. (default "custom") -validation.cost-attribution-label string - [experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total + [experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. -validation.create-grace-period duration Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m) -validation.enforce-metadata-metric-name Enforce every metadata has a metric name. (default true) + -validation.max-cost-attribution-per-user int + [experimental] The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution. -validation.max-label-names-per-series int Maximum number of label names per series. (default 30) -validation.max-length-label-name int diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2c51bce16a1..b1369945178 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -105,7 +105,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - costAttributionsvr *util.CostAttributionCleanupService + costAttributionSvc *util.CostAttributionCleanupService // For handling HA replicas. HATracker *haTracker @@ -307,10 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, - activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, - ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, - canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -345,7 +342,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove healthyInstancesCount: atomic.NewUint32(0), limits: limits, HATracker: haTracker, - costAttributionsvr: costAttributionClenaupService, + costAttributionSvc: costAttributionClenaupService, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -1424,7 +1421,9 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error { return err } - d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID)) + now := mtime.Now() + + d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID), now) if len(req.Timeseries) == 0 && len(req.Metadata) == 0 { return nil @@ -1655,14 +1654,18 @@ func tokenForMetadata(userID string, metricName string) uint32 { return mimirpb.ShardByMetricName(userID, metricName) } -func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) { +func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string, now time.Time) { var receivedSamples, receivedExemplars, receivedMetadata int - costAttribution := make(map[string]int) + costAttributionSize := 0 + if costAttributionLabel != "" { + costAttributionSize = d.limits.MaxCostAttributionPerUser(userID) + } + costAttribution := make(map[string]int, costAttributionSize) for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if costAttributionLabel != "" { - attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now()) + attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now, costAttributionSize) costAttribution[attribution]++ } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ec0a129fa63..5d494afeeae 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -7947,7 +7947,7 @@ func TestCheckStartedMiddleware(t *testing.T) { overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) + distributor, err := New(distributorConfig, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "user") diff --git a/pkg/ingester/activeseries/active_labels_test.go b/pkg/ingester/activeseries/active_labels_test.go index a2ade301687..e6f12911cc4 100644 --- a/pkg/ingester/activeseries/active_labels_test.go +++ b/pkg/ingester/activeseries/active_labels_test.go @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) { labels.FromStrings("a", "5"), } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) memPostings := index.NewMemPostings() for i, l := range series { diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go index 919dbd86a7f..765ac199633 100644 --- a/pkg/ingester/activeseries/active_native_histogram_postings_test.go +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index 9dbe313bd81..b5ea3a7bb5d 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) + activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 4882f02e69c..925cb191a95 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -48,8 +48,9 @@ type ActiveSeries struct { matchers *Matchers lastMatchersUpdate time.Time - CostAttributionLabel string - costAttributionsvr *util.CostAttributionCleanupService + costAttributionLabel string + costAttributionSvc *util.CostAttributionCleanupService + maxCostAttributionPerUser int // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. @@ -67,7 +68,8 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - costAttributionsvr *util.CostAttributionCleanupService + costAttributionSvc *util.CostAttributionCleanupService + maxCostAttributionPerUser int mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -98,17 +100,19 @@ func NewActiveSeries( timeout time.Duration, userID string, costAttributionLabel string, - costAttributionsvr *util.CostAttributionCleanupService, + costAttributionSvc *util.CostAttributionCleanupService, + maxCostAttributionPerUser int, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, - CostAttributionLabel: costAttributionLabel, - costAttributionsvr: costAttributionsvr, + costAttributionLabel: costAttributionLabel, + costAttributionSvc: costAttributionSvc, + maxCostAttributionPerUser: maxCostAttributionPerUser, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionsvr) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc, maxCostAttributionPerUser) } return c @@ -125,7 +129,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionsvr) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc, c.maxCostAttributionPerUser) } c.matchers = asm c.lastMatchersUpdate = now @@ -232,13 +236,13 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot } func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 { - total := map[string]uint32{} + total := make(map[string]uint32, c.maxCostAttributionPerUser) for s := 0; s < numStripes; s++ { c.stripes[s].mu.RLock() - defer c.stripes[s].mu.RUnlock() for k, v := range c.stripes[s].costAttributionValues { total[k] += v } + c.stripes[s].mu.RUnlock() } return total } @@ -429,7 +433,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly if s.costAttributionLabel != "" { - attributionValue := s.costAttributionsvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now()) + attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos), s.maxCostAttributionPerUser) s.costAttributionValues[attributionValue]++ e.attributionValue = attributionValue } @@ -462,7 +466,8 @@ func (s *seriesStripe) reinitialize( deleted *deletedSeries, userID string, costAttributionLabel string, - costAttributionsvr *util.CostAttributionCleanupService, + costAttributionSvc *util.CostAttributionCleanupService, + maxCostAttributionPerUser int, ) { s.mu.Lock() defer s.mu.Unlock() @@ -473,13 +478,14 @@ func (s *seriesStripe) reinitialize( s.costAttributionValues = map[string]uint32{} s.activeNativeHistograms = 0 s.activeNativeHistogramBuckets = 0 + s.maxCostAttributionPerUser = maxCostAttributionPerUser s.matchers = asm s.userID = userID s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) s.costAttributionLabel = costAttributionLabel - s.costAttributionsvr = costAttributionsvr + s.costAttributionSvc = costAttributionSvc } func (s *seriesStripe) purge(keepUntil time.Time) { diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index 53297e69079..bb0b8b140b5 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -29,7 +29,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4") ref5 := storage.SeriesRef(5) // will be used for ls1 again. - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(time.Now()) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() @@ -194,7 +194,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { @@ -221,7 +221,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) testUpdateSeries(t, c) } @@ -438,7 +438,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { func TestActiveSeries_UpdateSeries_Clear(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) testUpdateSeries(t, c) c.Clear() @@ -479,7 +479,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ls1, ls2 := labelsWithHashCollision() ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) c.UpdateSeries(ls1, ref1, time.Now(), -1) c.UpdateSeries(ls2, ref2, time.Now(), -1) @@ -507,7 +507,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) for i := 0; i < len(series); i++ { c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) @@ -553,7 +553,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil) + c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil, 0) exp := len(series) - ttl expMatchingSeries := 0 @@ -585,7 +585,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) currentTime := time.Now() - c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil, 0) c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) c.UpdateSeries(ls2, ref2, currentTime, -1) @@ -621,7 +621,7 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`})) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(currentTime) assert.True(t, valid) @@ -687,7 +687,7 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { })) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -726,7 +726,7 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -779,7 +779,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo var ( // Run the active series tracker with an active timeout = 0 so that the Purge() will always // purge the series. - c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil) + c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil, 0) updateGroup = &sync.WaitGroup{} purgeGroup = &sync.WaitGroup{} start = make(chan struct{}) @@ -917,7 +917,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) for round := 0; round <= tt.nRounds; round++ { for ix := 0; ix < tt.nSeries; ix++ { c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1) @@ -942,7 +942,7 @@ func benchmarkPurge(b *testing.B, twice bool) { const numExpiresSeries = numSeries / 25 currentTime := time.Now() - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil) + c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) series := [numSeries]labels.Labels{} refs := [numSeries]storage.SeriesRef{} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e780c449161..4c674b5dd8e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -312,7 +312,7 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService - costAttribution *util.CostAttributionCleanupService + costAttributionSvc *util.CostAttributionCleanupService tsdbMetrics *tsdbMetrics @@ -382,12 +382,7 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus } // New returns an Ingester that uses Mimir block storage. -func New( - cfg Config, limits *validation.Overrides, - ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, - activeGroupsCleanupService *util.ActiveGroupsCleanupService, - costAttributionCleanupService *util.CostAttributionCleanupService, - registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionCleanupService *util.CostAttributionCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -395,7 +390,7 @@ func New( i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - i.costAttribution = costAttributionCleanupService + i.costAttributionSvc = costAttributionCleanupService // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -1172,7 +1167,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // successfully committed stats = pushStats{ - failedSamplesAttribution: make(map[string]int), + failedSamplesAttribution: make(map[string]int, i.limits.MaxCostAttributionPerUser(userID)), } firstPartialErr error @@ -1290,10 +1285,8 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats db.ingestedAPISamples.Add(int64(stats.succeededSamplesCount)) } } - if stats.failedSamplesAttribution != nil && len(stats.failedSamplesAttribution) > 0 { - for label, count := range stats.failedSamplesAttribution { - discarded.samplesPerAttribution.WithLabelValues(userID, label).Add(float64(count)) - } + for label, count := range stats.failedSamplesAttribution { + discarded.samplesPerAttribution.WithLabelValues(userID, label).Add(float64(count)) } } @@ -1304,12 +1297,19 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn softErrorFunction), activeSeries *activeseries.ActiveSeries, outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error { // Return true if handled as soft error, and we can ingest more series. + // get the cost attribution value for the series + costLabel := i.limits.CostAttributionLabel(userID) handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool { - - // get the cost attribution value for the series - costLabel := i.limits.CostAttributionLabel(userID) if costLabel != "" { - costAttrib := i.costAttribution.UpdateAttributionTimestamp(userID, validation.AttributionValue(costLabel, userID, labels), time.Now()) + costAttrib := "" + for _, label := range labels { + if label.Name == costLabel { + costAttrib = label.Value + } + } + // get the label value and update the timestamp, + // if the cordianlity is reached or we are currently in cooldown period, function would returned __unaccounted__ + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend, i.limits.MaxCostAttributionPerUser(userID)) stats.failedSamplesAttribution[costAttrib]++ } @@ -1417,12 +1417,15 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var builder labels.ScratchBuilder var nonCopiedLabels labels.Labels for _, ts := range timeseries { - // The cost attribution value for the series - costLabel := i.limits.CostAttributionLabel(userID) var costAttrib string // when cost attribution label is set if costLabel != "" { - costAttrib = i.costAttribution.UpdateAttributionTimestamp(userID, validation.AttributionValue(costLabel, userID, ts.Labels), time.Now()) + for _, label := range ts.Labels { + if label.Name == costLabel { + costAttrib = label.Value + } + } + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend, i.limits.MaxCostAttributionPerUser(userID)) } // The labels must be sorted (in our case, it's guaranteed a write request @@ -2680,7 +2683,8 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD i.cfg.ActiveSeriesMetrics.IdleTimeout, userID, i.limits.CostAttributionLabel(userID), - i.costAttribution, + i.costAttributionSvc, + i.limits.MaxCostAttributionPerUser(userID), ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 20d01381959..46b79e7f4b3 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -108,7 +108,6 @@ type Config struct { NoAuthTenant string `yaml:"no_auth_tenant" category:"advanced"` ShutdownDelay time.Duration `yaml:"shutdown_delay" category:"advanced"` MaxSeparateMetricsGroupsPerUser int `yaml:"max_separate_metrics_groups_per_user" category:"experimental"` - MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" category:"experimental"` CostAttributionEvictionInterval time.Duration `yaml:"cost_attribution_eviction_interval" category:"experimental"` EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics" category:"advanced"` PrintConfig bool `yaml:"-"` @@ -170,7 +169,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.") f.DurationVar(&c.CostAttributionEvictionInterval, "cost-attribution-eviction-interval", 10*time.Minute, "Interval at which to evict inactive cost attributions.") f.IntVar(&c.MaxSeparateMetricsGroupsPerUser, "max-separate-metrics-groups-per-user", 1000, "Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated.") - f.IntVar(&c.MaxCostAttributionPerUser, "max-cost-attribution-per-user", 200, "Maximum number of cost attributions allowed per user.") f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics, such as go_sched_* and go_memstats_*.") f.BoolVar(&c.TimeseriesUnmarshalCachingOptimizationEnabled, "timeseries-unmarshal-caching-optimization-enabled", true, "Enables optimized marshaling of timeseries.") diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index cdd568669ed..725d43cdbdc 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -654,7 +654,7 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { } func (t *Mimir) initCostAttributionService() (services.Service, error) { - t.CostAttributionCleanup = util.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, t.Cfg.MaxCostAttributionPerUser, util_log.Logger) + t.CostAttributionCleanup = util.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger) return t.CostAttributionCleanup, nil } diff --git a/pkg/util/cost_attribution.go b/pkg/util/cost_attribution.go index 04d2a2ec7b6..d8ab19c74de 100644 --- a/pkg/util/cost_attribution.go +++ b/pkg/util/cost_attribution.go @@ -4,6 +4,7 @@ package util import ( "context" + "fmt" "sync" "time" @@ -14,17 +15,15 @@ import ( ) type CostAttribution struct { - mu sync.RWMutex - timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp - coolDownDeadline map[string]*atomic.Int64 - maxAttributionPerUser int + mu sync.RWMutex + timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp + coolDownDeadline map[string]*atomic.Int64 } -func NewCostAttribution(maxAttributionPerUser int) *CostAttribution { +func NewCostAttribution() *CostAttribution { return &CostAttribution{ - timestampsPerUser: map[string]map[string]*atomic.Int64{}, - coolDownDeadline: map[string]*atomic.Int64{}, - maxAttributionPerUser: maxAttributionPerUser, + timestampsPerUser: map[string]map[string]*atomic.Int64{}, + coolDownDeadline: map[string]*atomic.Int64{}, } } @@ -110,7 +109,7 @@ func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Durati } } -func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time) bool { +func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time, limit int) bool { // if we are still at the cooldown period, we will consider the limit reached ca.mu.RLock() defer ca.mu.RUnlock() @@ -126,7 +125,7 @@ func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, } // if the user has reached the limit, we will set the cooldown period which is 20 minutes - maxReached := len(ca.timestampsPerUser[userID]) >= ca.maxAttributionPerUser + maxReached := len(ca.timestampsPerUser[userID]) >= limit if maxReached { ca.coolDownDeadline[userID].Store(time.Now().Add(20 * time.Minute).UnixNano()) return true @@ -141,32 +140,34 @@ type CostAttributionCleanupService struct { costAttribution *CostAttribution cleanupFuncs []func(userID, attribution string) inactiveTimeout time.Duration + invalidValue string } type CostAttributionMetricsCleaner interface { RemoveAttributionMetricsForUser(userID, attribution string) } -func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, maxAttributionPerUser int, logger log.Logger, cleanupFns ...func(string, string)) *CostAttributionCleanupService { +func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, cleanupFns ...func(string, string)) *CostAttributionCleanupService { s := &CostAttributionCleanupService{ - costAttribution: NewCostAttribution(maxAttributionPerUser), + costAttribution: NewCostAttribution(), cleanupFuncs: cleanupFns, inactiveTimeout: inactiveTimeout, logger: logger, + invalidValue: "__unaccounted__", } s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution cleanup") return s } -func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time) string { +func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time, limit int) string { // empty label is not normal, if user set attribution label, the metrics send has to include the label if attribution == "" { - attribution = "other" - level.Error(s.logger).Log("msg", "set attribution label to \"other\" since missing cost attribution label in metrics") - } else if s.costAttribution.attributionLimitExceeded(user, attribution, now) { - attribution = "other" - level.Error(s.logger).Log("msg", "set attribution label to \"other\" since user has reached the limit of cost attribution labels") + attribution = s.invalidValue + level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since missing cost attribution label in metrics", s.invalidValue)) + } else if s.costAttribution.attributionLimitExceeded(user, attribution, now, limit) { + attribution = s.invalidValue + level.Error(s.logger).Log("msg", "set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue) } s.costAttribution.UpdateAttributionTimestampForUser(user, attribution, now) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 49f9ba6eb4b..e781b40bd67 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -144,7 +144,8 @@ type Limits struct { SeparateMetricsGroupLabel string `yaml:"separate_metrics_group_label" json:"separate_metrics_group_label" category:"experimental"` // User defined label to give the cost distribution by values of the label - CostAttributionLabel string `yaml:"cost_attribution_label" json:"cost_attribution_label" category:"experimental"` + CostAttributionLabel string `yaml:"cost_attribution_label" json:"cost_attribution_label" category:"experimental"` + MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" json:"max_cost_attribution_per_user" category:"experimental"` // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` @@ -284,7 +285,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.OutOfOrderBlocksExternalLabelEnabled, "ingester.out-of-order-blocks-external-label-enabled", false, "Whether the shipper should label out-of-order blocks with an external label before uploading them. Setting this label will compact out-of-order blocks separately from non-out-of-order blocks") f.StringVar(&l.SeparateMetricsGroupLabel, "validation.separate-metrics-group-label", "", "Label used to define the group label for metrics separation. For each write request, the group is obtained from the first non-empty group label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'group' label with group label's value. Currently applies to the following metrics: cortex_discarded_samples_total") - f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total") + f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.") + f.IntVar(&l.MaxCostAttributionPerUser, "validation.max-cost-attribution-per-user", 0, "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.") f.IntVar(&l.MaxChunksPerQuery, MaxChunksPerQueryFlag, 2e6, "Maximum number of chunks that can be fetched in a single query from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.Float64Var(&l.MaxEstimatedChunksPerQueryMultiplier, MaxEstimatedChunksPerQueryMultiplierFlag, 0, "Maximum number of chunks estimated to be fetched in a single query from ingesters and store-gateways, as a multiple of -"+MaxChunksPerQueryFlag+". This limit is enforced in the querier. Must be greater than or equal to 1, or 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, MaxSeriesPerQueryFlag, 0, "The maximum number of unique series for which a query can fetch samples from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") @@ -774,6 +776,10 @@ func (o *Overrides) CostAttributionLabel(userID string) string { return o.getOverridesForUser(userID).CostAttributionLabel } +func (o *Overrides) MaxCostAttributionPerUser(userID string) int { + return o.getOverridesForUser(userID).MaxCostAttributionPerUser +} + // IngestionTenantShardSize returns the ingesters shard size for a given user. func (o *Overrides) IngestionTenantShardSize(userID string) int { return o.getOverridesForUser(userID).IngestionTenantShardSize diff --git a/pkg/util/validation/separate_metrics.go b/pkg/util/validation/separate_metrics.go index 0f2384984b0..cc6194a42e4 100644 --- a/pkg/util/validation/separate_metrics.go +++ b/pkg/util/validation/separate_metrics.go @@ -30,17 +30,3 @@ func GroupLabel(o *Overrides, userID string, timeseries []mimirpb.PreallocTimese return "" } - -// AttributionLabel obtains the value of cost attribution label for tenant -func AttributionValue(attribLab string, userID string, lbs []mimirpb.LabelAdapter) string { - if len(lbs) == 0 { - return "" - } - - for _, label := range lbs { - if label.Name == attribLab { - return label.Value - } - } - return "" -}