Skip to content

Commit

Permalink
renaming of service
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 26, 2024
1 parent 8ae4571 commit 9320256
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4076,7 +4076,7 @@
"kind": "field",
"name": "cost_attribution_label",
"required": false,
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total",
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "validation.cost-attribution-label",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3072,7 +3072,7 @@ Usage of ./cmd/mimir/mimir:
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.cost-attribution-label string
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.
-validation.create-grace-period duration
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Expand Down
18 changes: 12 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Distributor struct {
distributorsLifecycler *ring.BasicLifecycler
distributorsRing *ring.Ring
healthyInstancesCount *atomic.Uint32
costAttributionSvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
// For handling HA replicas.
HATracker *haTracker

Expand Down Expand Up @@ -343,7 +343,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
HATracker: haTracker,
costAttributionSvr: costAttributionClenaupService,
costAttributionSvc: costAttributionClenaupService,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
maxCostAttributionPerUser: maxCostAttributionPerUser,

Expand Down Expand Up @@ -1423,7 +1423,9 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID))
now := mtime.Now()

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID), now)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1654,14 +1656,18 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) {
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string, now time.Time) {
var receivedSamples, receivedExemplars, receivedMetadata int
costAttribution := make(map[string]int, d.maxCostAttributionPerUser)
costAttributionSize := 0
if costAttributionLabel != "" {
costAttributionSize = d.maxCostAttributionPerUser
}
costAttribution := make(map[string]int, costAttributionSize)
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionSvr.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), mtime.Now())
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now)
costAttribution[attribution]++
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
30 changes: 18 additions & 12 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type ActiveSeries struct {
matchers *Matchers
lastMatchersUpdate time.Time

CostAttributionLabel string
costAttributionSvr *util.CostAttributionCleanupService
costAttributionLabel string
costAttributionSvc *util.CostAttributionCleanupService
maxCostAttributionPerUser int

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
Expand All @@ -67,7 +68,8 @@ type seriesStripe struct {
// Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64
costAttributionSvr *util.CostAttributionCleanupService
costAttributionSvc *util.CostAttributionCleanupService
maxCostAttributionPerUser int
mu sync.RWMutex
refs map[storage.SeriesRef]seriesEntry
active uint32 // Number of active entries in this stripe. Only decreased during purge or clear.
Expand Down Expand Up @@ -98,17 +100,19 @@ func NewActiveSeries(
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionSvr *util.CostAttributionCleanupService,
costAttributionSvc *util.CostAttributionCleanupService,
maxCostAttributionPerUser int,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
CostAttributionLabel: costAttributionLabel,
costAttributionSvr: costAttributionSvr,
costAttributionLabel: costAttributionLabel,
costAttributionSvc: costAttributionSvc,
maxCostAttributionPerUser: maxCostAttributionPerUser,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvr)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc, maxCostAttributionPerUser)
}

return c
Expand All @@ -125,7 +129,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.CostAttributionLabel, c.costAttributionSvr)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc, c.maxCostAttributionPerUser)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -232,7 +236,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
}

func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 {
total := make(map[string]uint32)
total := make(map[string]uint32, c.maxCostAttributionPerUser)
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
for k, v := range c.stripes[s].costAttributionValues {
Expand Down Expand Up @@ -429,7 +433,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef
// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := s.costAttributionSvr.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Now())
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos))
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
Expand Down Expand Up @@ -462,7 +466,8 @@ func (s *seriesStripe) reinitialize(
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionSvr *util.CostAttributionCleanupService,
costAttributionSvc *util.CostAttributionCleanupService,
maxCostAttributionPerUser int,
) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -473,13 +478,14 @@ func (s *seriesStripe) reinitialize(
s.costAttributionValues = map[string]uint32{}
s.activeNativeHistograms = 0
s.activeNativeHistogramBuckets = 0
s.maxCostAttributionPerUser = maxCostAttributionPerUser
s.matchers = asm
s.userID = userID
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionLabel = costAttributionLabel
s.costAttributionSvr = costAttributionSvr
s.costAttributionSvc = costAttributionSvc
}

func (s *seriesStripe) purge(keepUntil time.Time) {
Expand Down
28 changes: 14 additions & 14 deletions pkg/ingester/activeseries/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) {
ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4")
ref5 := storage.SeriesRef(5) // will be used for ls1 again.

c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0)
valid := c.Purge(time.Now())
assert.True(t, valid)
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers()
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) {
for ttl := 1; ttl <= len(series); ttl++ {
t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) {
mockedTime := time.Unix(int64(ttl), 0)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0)

// Update each series with a different timestamp according to each index
for i := 0; i < len(series); i++ {
Expand All @@ -221,7 +221,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) {

func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
testUpdateSeries(t, c)
}

Expand Down Expand Up @@ -438,7 +438,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) {

func TestActiveSeries_UpdateSeries_Clear(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
testUpdateSeries(t, c)

c.Clear()
Expand Down Expand Up @@ -479,7 +479,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) {
ls1, ls2 := labelsWithHashCollision()
ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2)

c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0)
c.UpdateSeries(ls1, ref1, time.Now(), -1)
c.UpdateSeries(ls2, ref2, time.Now(), -1)

Expand Down Expand Up @@ -507,7 +507,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) {
for ttl := 1; ttl <= len(series); ttl++ {
t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) {
mockedTime := time.Unix(int64(ttl), 0)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0)

for i := 0; i < len(series); i++ {
c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1)
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) {
t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) {
mockedTime := time.Unix(int64(ttl), 0)

c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil)
c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil, 0)

exp := len(series) - ttl
expMatchingSeries := 0
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) {
ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2)

currentTime := time.Now()
c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil)
c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil, 0)

c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1)
c.UpdateSeries(ls2, ref2, currentTime, -1)
Expand Down Expand Up @@ -621,7 +621,7 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) {
asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`}))

currentTime := time.Now()
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)

valid := c.Purge(currentTime)
assert.True(t, valid)
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) {
}))

currentTime := time.Now()
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
valid := c.Purge(currentTime)
assert.True(t, valid)
allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers()
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) {

currentTime := time.Now()

c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
valid := c.Purge(currentTime)
assert.True(t, valid)
allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers()
Expand Down Expand Up @@ -779,7 +779,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo
var (
// Run the active series tracker with an active timeout = 0 so that the Purge() will always
// purge the series.
c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil)
c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil, 0)
updateGroup = &sync.WaitGroup{}
purgeGroup = &sync.WaitGroup{}
start = make(chan struct{})
Expand Down Expand Up @@ -917,7 +917,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
for round := 0; round <= tt.nRounds; round++ {
for ix := 0; ix < tt.nSeries; ix++ {
c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1)
Expand All @@ -942,7 +942,7 @@ func benchmarkPurge(b *testing.B, twice bool) {
const numExpiresSeries = numSeries / 25

currentTime := time.Now()
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil)
c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0)

series := [numSeries]labels.Labels{}
refs := [numSeries]storage.SeriesRef{}
Expand Down
Loading

0 comments on commit 9320256

Please sign in to comment.