diff --git a/CHANGELOG.md b/CHANGELOG.md index e014c021554..7ed630ef432 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [ENHANCEMENT] Store-gateway: reduce memory usage in some LabelValues calls. #4789 * [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797 #4830 * [ENHANCEMENT] Distributor: make `__meta_tenant_id` label available in relabeling rules configured via `metric_relabel_configs`. #4725 +* [ENHANCEMENT] Querier: reduce CPU utilisation when shuffle sharding is enabled with large shard sizes. #4851 * [BUGFIX] Metadata API: Mimir will now return an empty object when no metadata is available, matching Prometheus. #4782 * [BUGFIX] Store-gateway: add collision detection on expanded postings and individual postings cache keys. #4770 diff --git a/go.mod b/go.mod index 505148d20d1..b0b7fb55db5 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 + github.com/grafana/dskit v0.0.0-20230427024853-1e1de772476d github.com/grafana/e2e v0.1.1-0.20230221201045-21ebba73580b github.com/hashicorp/golang-lru v0.6.0 github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 782026f0f21..d5460babc39 100644 --- a/go.sum +++ b/go.sum @@ -521,8 +521,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4= github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 h1:LQAhgcUPnzdjU/OjCJaLlPQI7NmQCRlfjMPSA1VegvA= github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= -github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 h1:sMRNbvPdgXIBh3LMJeZ6ykxZE7kchj8vkvmjBypu43M= -github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1/go.mod h1:31wpEibXmd1yC7sUBw1ilN9dhWatwQwbcOAbZGtTr/M= +github.com/grafana/dskit v0.0.0-20230427024853-1e1de772476d h1:SdBgw9gDt3eg192nl6czB72EGnQbcH4aG4RAO9gSSBE= +github.com/grafana/dskit v0.0.0-20230427024853-1e1de772476d/go.mod h1:31wpEibXmd1yC7sUBw1ilN9dhWatwQwbcOAbZGtTr/M= github.com/grafana/e2e v0.1.1-0.20230221201045-21ebba73580b h1:dzle+89/D0hOxscjZlkb6ovYA52t9hl6h/S+hI8ek1Q= github.com/grafana/e2e v0.1.1-0.20230221201045-21ebba73580b/go.mod h1:3UsooRp7yW5/NJQBlXcTsAHOoykEhNUYXkQ3r6ehEEY= github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6jkd6uAEWfHB8OD7dKJ0v2Oo6SNfhpfQ= diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6466231207d..1630d253efd 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -184,7 +184,8 @@ type Ring struct { // Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes. // If set to nil, no caching is done (used by tests, and subrings). - shuffledSubringCache map[subringCacheKey]*Ring + shuffledSubringCache map[subringCacheKey]*Ring + shuffledSubringWithLookbackCache map[subringCacheKey]cachedSubringWithLookback numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge @@ -194,8 +195,15 @@ type Ring struct { } type subringCacheKey struct { - identifier string - shardSize int + identifier string + shardSize int + lookbackPeriod time.Duration +} + +type cachedSubringWithLookback struct { + subring *Ring + validForLookbackWindowsStartingAfter int64 // if the lookback window is from T to S, validForLookbackWindowsStartingAfter is the earliest value of T this cache entry is valid for + validForLookbackWindowsStartingBefore int64 // if the lookback window is from T to S, validForLookbackWindowsStartingBefore is the latest value of T this cache entry is valid for } // New creates a new Ring. Being a service, Ring needs to be started to do anything. @@ -221,12 +229,13 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client } r := &Ring{ - key: key, - cfg: cfg, - KVClient: store, - strategy: strategy, - ringDesc: &Desc{}, - shuffledSubringCache: map[subringCacheKey]*Ring{}, + key: key, + cfg: cfg, + KVClient: store, + strategy: strategy, + ringDesc: &Desc{}, + shuffledSubringCache: map[subringCacheKey]*Ring{}, + shuffledSubringWithLookbackCache: map[subringCacheKey]cachedSubringWithLookback{}, numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_members", Help: "Number of members in the ring", @@ -324,10 +333,15 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.ringZones = ringZones r.oldestRegisteredTimestamp = oldestRegisteredTimestamp r.lastTopologyChange = now + + // Invalidate all cached subrings. if r.shuffledSubringCache != nil { - // Invalidate all cached subrings. r.shuffledSubringCache = make(map[subringCacheKey]*Ring) } + if r.shuffledSubringWithLookbackCache != nil { + r.shuffledSubringWithLookbackCache = make(map[subringCacheKey]cachedSubringWithLookback) + } + r.updateRingMetrics(rc) } @@ -623,14 +637,25 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // The returned subring may be unbalanced with regard to zones and should never be used for write // operations (read only). // -// This function doesn't support caching. +// This function supports caching, but the cache will only be effective if successive calls for the +// same identifier are for increasing values of (now-lookbackPeriod). func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { // Nothing to do if the shard size is not smaller then the actual ring. if size <= 0 || r.InstancesCount() <= size { return r } - return r.shuffleShard(identifier, size, lookbackPeriod, now) + if cached := r.getCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now); cached != nil { + return cached + } + + result := r.shuffleShard(identifier, size, lookbackPeriod, now) + + if result != r { + r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result) + } + + return result } func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring { @@ -877,6 +902,88 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri } } +func (r *Ring) getCachedShuffledSubringWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring { + if r.cfg.SubringCacheDisabled { + return nil + } + + cached, ok := r.shuffledSubringWithLookbackCache[subringCacheKey{identifier: identifier, shardSize: size, lookbackPeriod: lookbackPeriod}] + if !ok { + return nil + } + + lookbackWindowStart := now.Add(-lookbackPeriod).Unix() + if lookbackWindowStart < cached.validForLookbackWindowsStartingAfter || lookbackWindowStart > cached.validForLookbackWindowsStartingBefore { + // The cached subring is not valid for the lookback window that has been requested. + return nil + } + + cachedSubring := cached.subring + + // No need to update the cached subring if it is the original ring itself. + if r == cachedSubring { + return cachedSubring + } + + cachedSubring.mtx.Lock() + defer cachedSubring.mtx.Unlock() + + // Update instance states and timestamps. We know that the topology is the same, + // so zones and tokens are equal. + for name, cachedIng := range cachedSubring.ringDesc.Ingesters { + ing := r.ringDesc.Ingesters[name] + cachedIng.State = ing.State + cachedIng.Timestamp = ing.Timestamp + cachedSubring.ringDesc.Ingesters[name] = cachedIng + } + + return cachedSubring +} + +func (r *Ring) setCachedShuffledSubringWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time, subring *Ring) { + if subring == nil || r.cfg.SubringCacheDisabled { + return + } + + lookbackWindowStart := now.Add(-lookbackPeriod).Unix() + validForLookbackWindowsStartingBefore := int64(math.MaxInt64) + + for _, instance := range subring.ringDesc.Ingesters { + registeredDuringLookbackWindow := instance.RegisteredTimestamp >= lookbackWindowStart + + if registeredDuringLookbackWindow && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore { + validForLookbackWindowsStartingBefore = instance.RegisteredTimestamp + } + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + // Only cache if *this* ring hasn't changed since computing result + // (which can happen between releasing the read lock and getting read-write lock). + // Note that shuffledSubringWithLookbackCache can be only nil when set by test. + if r.shuffledSubringWithLookbackCache == nil { + return + } + + if !r.lastTopologyChange.Equal(subring.lastTopologyChange) { + return + } + + // Only update cache if subring's lookback window starts later than the previously cached subring for this identifier, + // if there is one. This prevents cache thrashing due to different calls competing if their lookback windows start + // before and after the time of an instance registering. + key := subringCacheKey{identifier: identifier, shardSize: size, lookbackPeriod: lookbackPeriod} + + if existingEntry, haveCached := r.shuffledSubringWithLookbackCache[key]; !haveCached || existingEntry.validForLookbackWindowsStartingAfter < lookbackWindowStart { + r.shuffledSubringWithLookbackCache[key] = cachedSubringWithLookback{ + subring: subring, + validForLookbackWindowsStartingAfter: lookbackWindowStart, + validForLookbackWindowsStartingBefore: validForLookbackWindowsStartingBefore, + } + } +} + func (r *Ring) CleanupShuffleShardCache(identifier string) { if r.cfg.SubringCacheDisabled { return @@ -890,6 +997,12 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) { delete(r.shuffledSubringCache, k) } } + + for k := range r.shuffledSubringWithLookbackCache { + if k.identifier == identifier { + delete(r.shuffledSubringWithLookbackCache, k) + } + } } func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { diff --git a/vendor/modules.txt b/vendor/modules.txt index a615c4e3360..84f8ea16c3f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -520,7 +520,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 +# github.com/grafana/dskit v0.0.0-20230427024853-1e1de772476d ## explicit; go 1.18 github.com/grafana/dskit/backoff github.com/grafana/dskit/cache