Skip to content

Commit

Permalink
new service for tracking cost attribution
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 25, 2024
1 parent 4ee5004 commit 3c422a8
Show file tree
Hide file tree
Showing 21 changed files with 340 additions and 81 deletions.
33 changes: 22 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "Maximum number of cost attributions allowed per user.",
"fieldValue": null,
"fieldDefaultValue": 200,
"fieldFlag": "max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_eviction_interval",
"required": false,
"desc": "Interval at which to evict inactive cost attributions.",
"fieldValue": null,
"fieldDefaultValue": 600000000000,
"fieldFlag": "cost-attribution-eviction-interval",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_go_runtime_metrics",
Expand Down Expand Up @@ -4061,17 +4083,6 @@
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "Maximum number of cost attributions per user. 0 to disable the limit.",
"fieldValue": null,
"fieldDefaultValue": 200,
"fieldFlag": "validation.max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_fetched_chunks_per_query",
Expand Down
6 changes: 4 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,8 @@ Usage of ./cmd/mimir/mimir:
Expands ${var} or $var in config according to the values of the environment variables.
-config.file value
Configuration file to load.
-cost-attribution-eviction-interval duration
[experimental] Interval at which to evict inactive cost attributions. (default 10m0s)
-debug.block-profile-rate int
Fraction of goroutine blocking events that are reported in the blocking profile. 1 to include every blocking event in the profile, 0 to disable.
-debug.mutex-profile-fraction int
Expand Down Expand Up @@ -1681,6 +1683,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Burst size, i.e., maximum number of messages that can be logged at once, temporarily exceeding the configured maximum logs per second. (default 1000)
-log.rate-limit-logs-per-second float
[experimental] Maximum number of messages per second to be logged. (default 10000)
-max-cost-attribution-per-user int
[experimental] Maximum number of cost attributions allowed per user. (default 200)
-max-separate-metrics-groups-per-user int
[experimental] Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated. (default 1000)
-mem-ballast-size-bytes int
Expand Down Expand Up @@ -3073,8 +3077,6 @@ Usage of ./cmd/mimir/mimir:
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-cost-attribution-per-user int
[experimental] Maximum number of cost attributions per user. 0 to disable the limit. (default 200)
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
18 changes: 14 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32

costAttributionsvr *util.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -307,7 +307,10 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides,
activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService,
ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing,
canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -342,6 +345,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionsvr: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -357,6 +361,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "attrib"}),

receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
Expand Down Expand Up @@ -640,7 +645,6 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.HATracker.cleanupHATrackerMetricsForUser(userID)

d.receivedRequests.DeleteLabelValues(userID)
d.receivedSamples.DeletePartialMatch(prometheus.Labels{"user": userID})
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingRequests.DeleteLabelValues(userID)
Expand All @@ -656,6 +660,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {

filter := prometheus.Labels{"user": userID}
d.dedupedSamples.DeletePartialMatch(filter)
d.receivedSamples.DeletePartialMatch(filter)
d.discardedSamplesTooManyHaClusters.DeletePartialMatch(filter)
d.discardedSamplesRateLimited.DeletePartialMatch(filter)
d.discardedRequestsRateLimited.DeleteLabelValues(userID)
Expand All @@ -674,6 +679,11 @@ func (d *Distributor) RemoveGroupMetricsForUser(userID, group string) {
d.sampleValidationMetrics.deleteUserMetricsForGroup(userID, group)
}

func (d *Distributor) RemoveAttributionMetricsForUser(userID, attribution string) {
d.receivedSamples.DeleteLabelValues(userID, attribution)
//TODO @ying: Remove attribution metrics
}

// Called after distributor is asked to stop via StopAsync.
func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
Expand Down Expand Up @@ -1652,7 +1662,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel)
attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
costAttribution[attribution]++
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
require.NoError(b, err)

// Start the distributor.
distributor, err := New(distributorCfg, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor))

Expand Down Expand Up @@ -5314,7 +5314,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
d, err := New(distributorCfg, clientConfig, overrides, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger())
d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, d))
t.Cleanup(func() {
Expand Down
1 change: 0 additions & 1 deletion pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ func removeNonASCIIChars(in string) (out string) {
// The returned error may retain the provided series labels.
func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, skipLabelNameValidation bool) error {
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls)

if err != nil {
m.missingMetricName.WithLabelValues(userID, group).Inc()
return errors.New(noMetricNameMsgFormat)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
24 changes: 15 additions & 9 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/grafana/mimir/pkg/util"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -48,6 +49,7 @@ type ActiveSeries struct {
lastMatchersUpdate time.Time

CostAttributionLabel string
costAttributionsvr *util.CostAttributionCleanupService

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
Expand All @@ -64,8 +66,8 @@ type seriesStripe struct {
// Unix nanoseconds. Only used by purge. Zero = unknown.
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64

oldestEntryTs atomic.Int64
costAttributionsvr *util.CostAttributionCleanupService
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand Down Expand Up @@ -96,15 +98,17 @@ func NewActiveSeries(
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionsvr *util.CostAttributionCleanupService,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
CostAttributionLabel: costAttributionLabel,
costAttributionsvr: costAttributionsvr,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionsvr)
}

return c
Expand All @@ -121,7 +125,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionsvr)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -230,6 +234,8 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 {
total := map[string]uint32{}
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
defer c.stripes[s].mu.RUnlock()
for k, v := range c.stripes[s].costAttributionValues {
total[k] += v
}
Expand Down Expand Up @@ -423,11 +429,9 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := series.Get(s.costAttributionLabel)
if attributionValue != "" {
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
attributionValue := s.costAttributionsvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}

s.refs[ref] = e
Expand Down Expand Up @@ -458,6 +462,7 @@ func (s *seriesStripe) reinitialize(
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionsvr *util.CostAttributionCleanupService,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -474,6 +479,7 @@ func (s *seriesStripe) reinitialize(
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionLabel = costAttributionLabel
s.costAttributionsvr = costAttributionsvr
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand Down
Loading

0 comments on commit 3c422a8

Please sign in to comment.