Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 26, 2024
1 parent 3c422a8 commit 7e628c3
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 120 deletions.
24 changes: 12 additions & 12 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "Maximum number of cost attributions allowed per user.",
"fieldValue": null,
"fieldDefaultValue": 200,
"fieldFlag": "max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_eviction_interval",
Expand Down Expand Up @@ -4076,13 +4065,24 @@
"kind": "field",
"name": "cost_attribution_label",
"required": false,
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total",
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "validation.cost-attribution-label",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "validation.max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_fetched_chunks_per_query",
Expand Down
6 changes: 3 additions & 3 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1683,8 +1683,6 @@ Usage of ./cmd/mimir/mimir:
[experimental] Burst size, i.e., maximum number of messages that can be logged at once, temporarily exceeding the configured maximum logs per second. (default 1000)
-log.rate-limit-logs-per-second float
[experimental] Maximum number of messages per second to be logged. (default 10000)
-max-cost-attribution-per-user int
[experimental] Maximum number of cost attributions allowed per user. (default 200)
-max-separate-metrics-groups-per-user int
[experimental] Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated. (default 1000)
-mem-ballast-size-bytes int
Expand Down Expand Up @@ -3072,11 +3070,13 @@ Usage of ./cmd/mimir/mimir:
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.cost-attribution-label string
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.
-validation.create-grace-period duration
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-cost-attribution-per-user int
[experimental] The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
23 changes: 13 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionsvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -307,10 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides,
activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService,
ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing,
canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -345,7 +342,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionsvr: costAttributionClenaupService,
costAttributionSvc: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -1424,7 +1421,9 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID))
now := mtime.Now()

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID), now)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1655,14 +1654,18 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) {
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string, now time.Time) {
var receivedSamples, receivedExemplars, receivedMetadata int
costAttribution := make(map[string]int)
costAttributionSize := 0
if costAttributionLabel != "" {
costAttributionSize = d.limits.MaxCostAttributionPerUser(userID)
}
costAttribution := make(map[string]int, costAttributionSize)
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionsvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now, costAttributionSize)
costAttribution[attribution]++
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7947,7 +7947,7 @@ func TestCheckStartedMiddleware(t *testing.T) {
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
distributor, err := New(distributorConfig, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger())
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "user")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
32 changes: 19 additions & 13 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type ActiveSeries struct {
matchers *Matchers
lastMatchersUpdate time.Time

CostAttributionLabel string
costAttributionsvr *util.CostAttributionCleanupService
costAttributionLabel string
costAttributionSvc *util.CostAttributionCleanupService
maxCostAttributionPerUser int

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
Expand All @@ -67,7 +68,8 @@ type seriesStripe struct {
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
costAttributionsvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
maxCostAttributionPerUser int
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand Down Expand Up @@ -98,17 +100,19 @@ func NewActiveSeries(
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionsvr *util.CostAttributionCleanupService,
costAttributionSvc *util.CostAttributionCleanupService,
maxCostAttributionPerUser int,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
CostAttributionLabel: costAttributionLabel,
costAttributionsvr: costAttributionsvr,
costAttributionLabel: costAttributionLabel,
costAttributionSvc: costAttributionSvc,
maxCostAttributionPerUser: maxCostAttributionPerUser,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionsvr)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc, maxCostAttributionPerUser)
}

return c
Expand All @@ -125,7 +129,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionsvr)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc, c.maxCostAttributionPerUser)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -232,13 +236,13 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
}

func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 {
total := map[string]uint32{}
total := make(map[string]uint32, c.maxCostAttributionPerUser)
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
defer c.stripes[s].mu.RUnlock()
for k, v := range c.stripes[s].costAttributionValues {
total[k] += v
}
c.stripes[s].mu.RUnlock()
}
return total
}
Expand Down Expand Up @@ -429,7 +433,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := s.costAttributionsvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos), s.maxCostAttributionPerUser)
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
Expand Down Expand Up @@ -462,7 +466,8 @@ func (s *seriesStripe) reinitialize(
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionsvr *util.CostAttributionCleanupService,
costAttributionSvc *util.CostAttributionCleanupService,
maxCostAttributionPerUser int,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -473,13 +478,14 @@ func (s *seriesStripe) reinitialize(
s.costAttributionValues = map[string]uint32{}
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.maxCostAttributionPerUser = maxCostAttributionPerUser
s.matchers = asm
s.userID = userID
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionLabel = costAttributionLabel
s.costAttributionsvr = costAttributionsvr
s.costAttributionSvc = costAttributionSvc
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand Down
Loading

0 comments on commit 7e628c3

Please sign in to comment.