diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a745614a5c..caf3141be58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * `-blocks-storage.bucket-store.index-header-lazy-loading-enabled` is deprecated, use the new configuration `-blocks-storage.bucket-store.index-header.lazy-loading-enabled` * `-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout` is deprecated, use the new configuration `-blocks-storage.bucket-store.index-header.lazy-loading-idle-timeout` * `-blocks-storage.bucket-store.index-header-lazy-loading-concurrency` is deprecated, use the new configuration `-blocks-storage.bucket-store.index-header.lazy-loading-concurrency` +* [CHANGE] Store-gateway: remove experimental fine-grained chunks caching. The following experimental configuration parameters have been removed `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled`, `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series`. #5816 * [FEATURE] Introduced `distributor.service_overload_status_code_on_rate_limit_enabled` flag for configuring status code to 529 instead of 429 upon rate limit exhaustion. #5752 * [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136 * [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 5d4de826e71..8cb9a9abcd4 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6840,17 +6840,6 @@ "fieldFlag": "blocks-storage.bucket-store.chunks-cache.subrange-ttl", "fieldType": "duration", "fieldCategory": "advanced" - }, - { - "kind": "field", - "name": "fine_grained_chunks_caching_enabled", - "required": false, - "desc": "Enable fine-grained caching of chunks in the store-gateway. This reduces the required bandwidth and memory utilization.", - "fieldValue": null, - "fieldDefaultValue": false, - "fieldFlag": "blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled", - "fieldType": "boolean", - "fieldCategory": "experimental" } ], "fieldValue": null, @@ -7756,17 +7745,6 @@ "fieldType": "int", "fieldCategory": "advanced" }, - { - "kind": "field", - "name": "fine_grained_chunks_caching_ranges_per_series", - "required": false, - "desc": "This option controls into how many ranges the chunks of each series from each block are split. This value is effectively the number of chunks cache items per series per block when -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled is enabled.", - "fieldValue": null, - "fieldDefaultValue": 1, - "fieldFlag": "blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series", - "fieldType": "int", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "series_selection_strategy", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index eb8a38eca5d..256527b5f30 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -307,8 +307,6 @@ Usage of ./cmd/mimir/mimir: TTL for caching object attributes for chunks. If the metadata cache is configured, attributes will be stored under this cache backend, otherwise attributes are stored in the chunks cache backend. (default 168h0m0s) -blocks-storage.bucket-store.chunks-cache.backend string Backend for chunks cache, if not empty. Supported values: memcached, redis. - -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled - [experimental] Enable fine-grained caching of chunks in the store-gateway. This reduces the required bandwidth and memory utilization. -blocks-storage.bucket-store.chunks-cache.max-get-range-requests int Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests. (default 3) -blocks-storage.bucket-store.chunks-cache.memcached.addresses comma-separated-list-of-strings @@ -401,8 +399,6 @@ Usage of ./cmd/mimir/mimir: Client write timeout. (default 3s) -blocks-storage.bucket-store.chunks-cache.subrange-ttl duration TTL for caching individual chunks subranges. (default 24h0m0s) - -blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series int - [experimental] This option controls into how many ranges the chunks of each series from each block are split. This value is effectively the number of chunks cache items per series per block when -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled is enabled. (default 1) -blocks-storage.bucket-store.ignore-blocks-within duration Blocks with minimum time within this duration are ignored, and not loaded by store-gateway. Useful when used together with -querier.query-store-after to prevent loading young blocks, because there are usually many of them (depending on number of ingesters) and they are not yet compacted. Negative values or 0 disable the filter. (default 10h0m0s) -blocks-storage.bucket-store.ignore-deletion-marks-delay duration diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 7c7162896e4..5b56ea87d3f 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -125,8 +125,6 @@ The following features are currently experimental: - Query-scheduler - `-query-scheduler.querier-forget-delay` - Store-gateway - - `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled` - - `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series` - Use of Redis cache backend (`-blocks-storage.bucket-store.chunks-cache.backend=redis`, `-blocks-storage.bucket-store.index-cache.backend=redis`, `-blocks-storage.bucket-store.metadata-cache.backend=redis`) - `-blocks-storage.bucket-store.series-selection-strategy` - Read-write deployment mode diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index dd273a7633e..f8a657afbe1 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -3300,11 +3300,6 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-ttl [subrange_ttl: | default = 24h] - # (experimental) Enable fine-grained caching of chunks in the store-gateway. - # This reduces the required bandwidth and memory utilization. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled - [fine_grained_chunks_caching_enabled: | default = false] - metadata_cache: # Backend for metadata cache, if not empty. Supported values: memcached, # redis. @@ -3502,14 +3497,6 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.batch-series-size [streaming_series_batch_size: | default = 5000] - # (experimental) This option controls into how many ranges the chunks of each - # series from each block are split. This value is effectively the number of - # chunks cache items per series per block when - # -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled - # is enabled. - # CLI flag: -blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series - [fine_grained_chunks_caching_ranges_per_series: | default = 1] - # (experimental) This option controls the strategy to selection of series and # deferring application of matchers. A more aggressive strategy will fetch # less posting lists at the cost of more series. This is useful when querying diff --git a/pkg/storage/tsdb/caching_config.go b/pkg/storage/tsdb/caching_config.go index 4c170647a7c..45e194d0f11 100644 --- a/pkg/storage/tsdb/caching_config.go +++ b/pkg/storage/tsdb/caching_config.go @@ -33,11 +33,10 @@ var supportedCacheBackends = []string{cache.BackendMemcached, cache.BackendRedis type ChunksCacheConfig struct { cache.BackendConfig `yaml:",inline"` - MaxGetRangeRequests int `yaml:"max_get_range_requests" category:"advanced"` - AttributesTTL time.Duration `yaml:"attributes_ttl" category:"advanced"` - AttributesInMemoryMaxItems int `yaml:"attributes_in_memory_max_items" category:"advanced"` - SubrangeTTL time.Duration `yaml:"subrange_ttl" category:"advanced"` - FineGrainedChunksCachingEnabled bool `yaml:"fine_grained_chunks_caching_enabled" category:"experimental"` + MaxGetRangeRequests int `yaml:"max_get_range_requests" category:"advanced"` + AttributesTTL time.Duration `yaml:"attributes_ttl" category:"advanced"` + AttributesInMemoryMaxItems int `yaml:"attributes_in_memory_max_items" category:"advanced"` + SubrangeTTL time.Duration `yaml:"subrange_ttl" category:"advanced"` } func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { @@ -50,7 +49,6 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks. If the metadata cache is configured, attributes will be stored under this cache backend, otherwise attributes are stored in the chunks cache backend.") f.IntVar(&cfg.AttributesInMemoryMaxItems, prefix+"attributes-in-memory-max-items", 50000, "Maximum number of object attribute items to keep in a first level in-memory LRU cache. Metadata will be stored and fetched in-memory before hitting the cache backend. 0 to disable the in-memory cache.") f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.") - f.BoolVar(&cfg.FineGrainedChunksCachingEnabled, prefix+"fine-grained-chunks-caching-enabled", false, "Enable fine-grained caching of chunks in the store-gateway. This reduces the required bandwidth and memory utilization.") } func (cfg *ChunksCacheConfig) Validate() error { @@ -138,9 +136,7 @@ func CreateCachingBucket(chunksCache cache.Cache, chunksConfig ChunksCacheConfig return nil, errors.Wrapf(err, "wrap metadata cache with in-memory cache") } } - if !chunksConfig.FineGrainedChunksCachingEnabled { - cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, subrangeSize, attributesCache, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) - } + cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, subrangeSize, attributesCache, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } if !cachingConfigured { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b4b434b7e25..b1cc50b19bc 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -404,7 +404,6 @@ type BucketStoreConfig struct { IndexHeader indexheader.Config `yaml:"index_header" category:"experimental"` StreamingBatchSize int `yaml:"streaming_series_batch_size" category:"advanced"` - ChunkRangesPerSeries int `yaml:"fine_grained_chunks_caching_ranges_per_series" category:"experimental"` SeriesSelectionStrategyName string `yaml:"series_selection_strategy" category:"experimental"` SelectionStrategies struct { WorstCaseSeriesPreference float64 `yaml:"worst_case_series_preference" category:"experimental"` @@ -451,7 +450,6 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DeprecatedIndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", indexheader.DefaultIndexHeaderLazyLoadingIdleTimeout, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.") f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", DefaultPartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.") f.IntVar(&cfg.StreamingBatchSize, "blocks-storage.bucket-store.batch-series-size", 5000, "This option controls how many series to fetch per batch. The batch size must be greater than 0.") - f.IntVar(&cfg.ChunkRangesPerSeries, "blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series", 1, "This option controls into how many ranges the chunks of each series from each block are split. This value is effectively the number of chunks cache items per series per block when -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled is enabled.") f.StringVar(&cfg.SeriesSelectionStrategyName, seriesSelectionStrategyFlag, WorstCasePostingsStrategy, "This option controls the strategy to selection of series and deferring application of matchers. A more aggressive strategy will fetch less posting lists at the cost of more series. This is useful when querying large blocks in which many series share the same label name and value. Supported values (most aggressive to least aggressive): "+strings.Join(validSeriesSelectionStrategies, ", ")+".") f.Float64Var(&cfg.SelectionStrategies.WorstCaseSeriesPreference, "blocks-storage.bucket-store.series-selection-strategies.worst-case-series-preference", 0.75, "This option is only used when "+seriesSelectionStrategyFlag+"="+WorstCasePostingsStrategy+". Increasing the series preference results in fetching more series than postings. Must be a positive floating point number.") } diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 8506ddedecc..ffd886b5a8b 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -45,7 +45,6 @@ import ( "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" @@ -92,7 +91,6 @@ type BucketStore struct { fetcher block.MetadataFetcher dir string indexCache indexcache.IndexCache - chunksCache chunkscache.Cache indexReaderPool *indexheader.ReaderPool seriesHashCache *hashcache.SeriesHashCache @@ -109,14 +107,6 @@ type BucketStore struct { // This value must be greater than zero. maxSeriesPerBatch int - // numChunksRangesPerSeries controls into how many ranges the chunks of each series from each block are split. - // This value is effectively the number of chunks cache items per series per block. - numChunksRangesPerSeries int - - // fineGrainedChunksCachingEnabled controls whether to use the per series chunks caching - // or rely on the transparent caching bucket. - fineGrainedChunksCachingEnabled bool - // Query gate which limits the maximum amount of concurrent queries. queryGate gate.Gate @@ -194,13 +184,6 @@ func WithIndexCache(cache indexcache.IndexCache) BucketStoreOption { } } -// WithChunksCache sets a chunksCache to use instead of a noopCache. -func WithChunksCache(cache chunkscache.Cache) BucketStoreOption { - return func(s *BucketStore) { - s.chunksCache = cache - } -} - // WithQueryGate sets a queryGate to use instead of a gate.NewNoop(). func WithQueryGate(queryGate gate.Gate) BucketStoreOption { return func(s *BucketStore) { @@ -215,12 +198,6 @@ func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption { } } -func WithFineGrainedChunksCaching(enabled bool) BucketStoreOption { - return func(s *BucketStore) { - s.fineGrainedChunksCachingEnabled = enabled - } -} - // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -243,7 +220,6 @@ func NewBucketStore( fetcher: fetcher, dir: dir, indexCache: noopCache{}, - chunksCache: chunkscache.NoopCache{}, blocks: map[ulid.ULID]*bucketBlock{}, blockSet: newBucketBlockSet(), blockSyncConcurrency: bucketStoreConfig.BlockSyncConcurrency, @@ -258,7 +234,6 @@ func NewBucketStore( metrics: metrics, userID: userID, maxSeriesPerBatch: bucketStoreConfig.StreamingBatchSize, - numChunksRangesPerSeries: bucketStoreConfig.ChunkRangesPerSeries, postingsStrategy: postingsStrategy, } @@ -1032,11 +1007,7 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks( var set storepb.SeriesSet if !req.SkipChunks { - var cache chunkscache.Cache - if s.fineGrainedChunksCachingEnabled { - cache = s.chunksCache - } - ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, cache, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) + ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) set = newSeriesChunksSeriesSet(ss) } else { set = newSeriesSetWithoutChunks(ctx, it, stats) @@ -1092,12 +1063,7 @@ func (s *BucketStore) streamingChunksSetForBlocks( if err != nil { return nil, err } - - var cache chunkscache.Cache - if s.fineGrainedChunksCachingEnabled { - cache = s.chunksCache - } - scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, cache, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) + scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) return scsi, nil } @@ -1150,7 +1116,6 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( cachedSeriesHasher{blockSeriesHashCache}, strategy, req.MinTime, req.MaxTime, - s.numChunksRangesPerSeries, stats, r, s.logger, @@ -1205,20 +1170,11 @@ func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) { s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) - if s.fineGrainedChunksCachingEnabled { - s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksProcessed)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksProcessedSizeSum)) - // With fine-grained caching we may have touched more chunks than we need because we had to fetch a - // whole range of chunks, which includes chunks outside the request's minT/maxT. - s.metrics.seriesDataTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksReturned)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksReturnedSizeSum)) - } else { - s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouchedSizeSum)) - // For the implementation which uses the caching bucket the bytes we touch are the bytes we return. - s.metrics.seriesDataTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouchedSizeSum)) - } + s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouchedSizeSum)) + // For the implementation which uses the caching bucket the bytes we touch are the bytes we return. + s.metrics.seriesDataTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouchedSizeSum)) s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) } @@ -1440,7 +1396,6 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers [] cachedSeriesHasher{nil}, noChunkRefs, minTime, maxTime, - 1, // we skip chunks, so this doesn't make any difference stats, nil, logger, @@ -1662,7 +1617,6 @@ func labelValuesFromSeries(ctx context.Context, labelName string, seriesPerBatch b.meta.MinTime, b.meta.MaxTime, b.userID, - 1, b.logger, ) if len(pendingMatchers) > 0 { diff --git a/pkg/storegateway/bucket_chunk_reader_test.go b/pkg/storegateway/bucket_chunk_reader_test.go index 186a136d197..25e9ebb6b11 100644 --- a/pkg/storegateway/bucket_chunk_reader_test.go +++ b/pkg/storegateway/bucket_chunk_reader_test.go @@ -41,7 +41,6 @@ func TestBucketChunkReader_refetchChunks(t *testing.T) { defaultStrategy, block.meta.MinTime, block.meta.MaxTime, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 1cb3571a31d..ef206a6f085 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" "github.com/grafana/mimir/pkg/storegateway/storepb" @@ -45,17 +44,12 @@ var ( type swappableCache struct { indexcache.IndexCache - chunkscache.Cache } func (c *swappableCache) SwapIndexCacheWith(cache indexcache.IndexCache) { c.IndexCache = cache } -func (c *swappableCache) SwapChunksCacheWith(cache chunkscache.Cache) { - c.Cache = cache -} - type storeSuite struct { store *BucketStore minTime, maxTime int64 @@ -121,7 +115,6 @@ type prepareStoreConfig struct { seriesLimiterFactory SeriesLimiterFactory series []labels.Labels indexCache indexcache.IndexCache - chunksCache chunkscache.Cache metricsRegistry *prometheus.Registry postingsStrategy postingsSelectionStrategy // When nonOverlappingBlocks is false, prepare store creates 2 blocks per block range. @@ -150,7 +143,6 @@ func defaultPrepareStoreConfig(t testing.TB) *prepareStoreConfig { chunksLimiterFactory: newStaticChunksLimiterFactory(0), indexCache: noopCache{}, postingsStrategy: selectAllStrategy{}, - chunksCache: chunkscache.NoopCache{}, series: []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -180,7 +172,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS s := &storeSuite{ logger: log.NewNopLogger(), metricsRegistry: cfg.metricsRegistry, - cache: &swappableCache{IndexCache: cfg.indexCache, Cache: cfg.chunksCache}, + cache: &swappableCache{IndexCache: cfg.indexCache}, minTime: minTime, maxTime: maxTime, } @@ -189,7 +181,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS assert.NoError(t, err) // Have our options in the beginning so tests can override logger and index cache if they need to - storeOpts := []BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache), WithChunksCache(s.cache)} + storeOpts := []BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache)} store, err := NewBucketStore( "tenant", @@ -198,7 +190,6 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS cfg.tempDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: cfg.maxSeriesPerBatch, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 20, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -487,7 +478,6 @@ func TestBucketStore_e2e(t *testing.T) { if ok := t.Run("no caches", func(t *testing.T) { s.cache.SwapIndexCacheWith(noopCache{}) - s.cache.SwapChunksCacheWith(chunkscache.NoopCache{}) testBucketStore_e2e(t, ctx, s) }); !ok { return @@ -505,7 +495,7 @@ func TestBucketStore_e2e(t *testing.T) { return } - if ok := t.Run("with small index cache", func(t *testing.T) { + t.Run("with small index cache", func(t *testing.T) { indexCache2, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ MaxItemSize: 50, MaxSize: 100, @@ -513,20 +503,6 @@ func TestBucketStore_e2e(t *testing.T) { assert.NoError(t, err) s.cache.SwapIndexCacheWith(indexCache2) testBucketStore_e2e(t, ctx, s) - }); !ok { - return - } - - t.Run("with large, sufficient index cache, and chunks cache", func(t *testing.T) { - indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ - MaxItemSize: 1e5, - MaxSize: 2e5, - }) - assert.NoError(t, err) - assert.NoError(t, err) - s.cache.SwapIndexCacheWith(indexCache) - s.cache.SwapChunksCacheWith(newInMemoryChunksCache()) - testBucketStore_e2e(t, ctx, s) }) }) } @@ -565,7 +541,6 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { if ok := t.Run("no caches", func(t *testing.T) { s.cache.SwapIndexCacheWith(noopCache{}) - s.cache.SwapChunksCacheWith(chunkscache.NoopCache{}) testBucketStore_e2e(t, ctx, s, additionalCases...) }); !ok { return @@ -583,7 +558,7 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { return } - if ok := t.Run("with small index cache", func(t *testing.T) { + t.Run("with small index cache", func(t *testing.T) { indexCache2, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ MaxItemSize: 50, MaxSize: 100, @@ -591,20 +566,6 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { assert.NoError(t, err) s.cache.SwapIndexCacheWith(indexCache2) testBucketStore_e2e(t, ctx, s) - }); !ok { - return - } - - t.Run("with large, sufficient index cache, and chunks cache", func(t *testing.T) { - indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ - MaxItemSize: 1e5, - MaxSize: 2e5, - }) - assert.NoError(t, err) - assert.NoError(t, err) - s.cache.SwapIndexCacheWith(indexCache) - s.cache.SwapChunksCacheWith(newInMemoryChunksCache()) - testBucketStore_e2e(t, ctx, s) }) }) } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index c7530bad17f..fedde8867d4 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -29,7 +29,6 @@ import ( "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/storepb" util_log "github.com/grafana/mimir/pkg/util/log" @@ -55,8 +54,6 @@ type BucketStores struct { // Index cache shared across all tenants. indexCache indexcache.IndexCache - chunksCache chunkscache.Cache - // Series hash cache shared across all tenants. seriesHashCache *hashcache.SeriesHashCache @@ -157,12 +154,6 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra return nil, errors.Wrap(err, "create index cache") } - chunksCache, err := chunkscache.NewChunksCache(logger, chunksCacheClient, reg) - if err != nil { - return nil, errors.Wrap(err, "create chunks cache") - } - u.chunksCache = chunkscache.NewTracingCache(chunksCache, logger) - if reg != nil { reg.MustRegister(u.metaFetcherMetrics) } @@ -471,10 +462,8 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { bucketStoreOpts := []BucketStoreOption{ WithLogger(userLogger), WithIndexCache(u.indexCache), - WithChunksCache(u.chunksCache), WithQueryGate(u.queryGate), WithLazyLoadingGate(u.lazyLoadingGate), - WithFineGrainedChunksCaching(u.cfg.BucketStore.ChunksCache.FineGrainedChunksCachingEnabled), } bs, err := NewBucketStore( diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 767fee1d56c..6d96e886b7d 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -56,7 +56,6 @@ import ( "github.com/grafana/mimir/pkg/storage/sharding" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" @@ -1168,7 +1167,6 @@ func loadSeries(ctx context.Context, tb test.TB, postings []storage.SeriesRef, i 0, 0, "", - 1, log.NewNopLogger(), ) series := make([]labels.Labels, 0, len(postings)) @@ -1468,7 +1466,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries maxSeriesPerBatch: 10000, }, "with series streaming and caches (1K per batch)": { - options: []BucketStoreOption{WithLogger(logger), WithIndexCache(newInMemoryIndexCache(t)), WithChunksCache(newInMemoryChunksCache())}, + options: []BucketStoreOption{WithLogger(logger), WithIndexCache(newInMemoryIndexCache(t))}, maxSeriesPerBatch: 1000, }, } @@ -1482,7 +1480,6 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: testData.maxSeriesPerBatch, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 1, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -1612,7 +1609,6 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: batchSize, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 1, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -1768,11 +1764,10 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } store := &BucketStore{ - userID: "test", - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: indexCache, - chunksCache: chunkscache.NoopCache{}, + userID: "test", + bkt: objstore.WithNoopInstr(bkt), + logger: logger, + indexCache: indexCache, indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), indexheader.Config{ LazyLoadingEnabled: false, LazyLoadingIdleTimeout: 0, @@ -1784,12 +1779,11 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1.meta.ULID: b1, b2.meta.ULID: b2, }, - postingsStrategy: selectAllStrategy{}, - queryGate: gate.NewNoop(), - chunksLimiterFactory: newStaticChunksLimiterFactory(0), - seriesLimiterFactory: newStaticSeriesLimiterFactory(0), - maxSeriesPerBatch: 65536, - numChunksRangesPerSeries: 1, + postingsStrategy: selectAllStrategy{}, + queryGate: gate.NewNoop(), + chunksLimiterFactory: newStaticChunksLimiterFactory(0), + seriesLimiterFactory: newStaticSeriesLimiterFactory(0), + maxSeriesPerBatch: 65536, } srv := newBucketStoreTestServer(t, store) @@ -1940,7 +1934,6 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -1958,7 +1951,6 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { NewBucketStoreMetrics(nil), WithLogger(logger), WithIndexCache(indexCache), - WithChunksCache(newInMemoryChunksCache()), ) assert.NoError(t, err) defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }() @@ -2000,7 +1992,6 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2067,7 +2058,6 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2193,7 +2183,6 @@ func testBucketStoreSeriesBlockWithMultipleChunks( tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2211,7 +2200,6 @@ func testBucketStoreSeriesBlockWithMultipleChunks( NewBucketStoreMetrics(nil), WithLogger(logger), WithIndexCache(indexCache), - WithChunksCache(newInMemoryChunksCache()), ) assert.NoError(t, err) assert.NoError(t, store.SyncBlocks(context.Background())) @@ -2371,7 +2359,6 @@ func TestBucketStore_Series_Limits(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: batchSize, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2483,7 +2470,7 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.InMemoryIndexCacheConfig{}) assert.NoError(tb, err) - opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache), WithChunksCache(newInMemoryChunksCache())}, opts...) + opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache)}, opts...) store, err := NewBucketStore( "tenant", instrBkt, @@ -2491,7 +2478,6 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: maxSeriesPerBatch, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ diff --git a/pkg/storegateway/chunkscache/cache.go b/pkg/storegateway/chunkscache/cache.go deleted file mode 100644 index 055d0b83c03..00000000000 --- a/pkg/storegateway/chunkscache/cache.go +++ /dev/null @@ -1,147 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package chunkscache - -import ( - "context" - "fmt" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/cache" - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/tsdb/chunks" - - "github.com/grafana/mimir/pkg/util/pool" - "github.com/grafana/mimir/pkg/util/spanlogger" -) - -// Range is a contiguous range of chunks. Start is the ref of the first chunk and NumChunks is how many chunks -// there are in the range. -type Range struct { - BlockID ulid.ULID - Start chunks.ChunkRef - NumChunks int -} - -type Cache interface { - FetchMultiChunks(ctx context.Context, userID string, ranges []Range, chunksPool *pool.SafeSlabPool[byte]) (hits map[Range][]byte) - StoreChunks(userID string, ranges map[Range][]byte) -} - -type TracingCache struct { - c Cache - l log.Logger -} - -func NewTracingCache(c Cache, l log.Logger) TracingCache { - return TracingCache{ - c: c, - l: l, - } -} - -func (c TracingCache) FetchMultiChunks(ctx context.Context, userID string, ranges []Range, chunksPool *pool.SafeSlabPool[byte]) (hits map[Range][]byte) { - hits = c.c.FetchMultiChunks(ctx, userID, ranges, chunksPool) - - l := spanlogger.FromContext(ctx, c.l) - level.Debug(l).Log( - "name", "ChunksCache.FetchMultiChunks", - "ranges_requested", len(ranges), - "ranges_hit", len(hits), - "ranges_hit_bytes", hitsSize(hits), - "ranges_misses", len(ranges)-len(hits), - ) - return -} - -func hitsSize(hits map[Range][]byte) (size int) { - for _, b := range hits { - size += len(b) - } - return -} - -func (c TracingCache) StoreChunks(userID string, ranges map[Range][]byte) { - c.c.StoreChunks(userID, ranges) -} - -type ChunksCache struct { - logger log.Logger - cache cache.Cache - - // TODO these two will soon be tracked by the dskit, we can remove them once https://github.com/grafana/mimir/pull/4078 is merged - requests prometheus.Counter - hits prometheus.Counter -} - -type NoopCache struct{} - -func (NoopCache) FetchMultiChunks(_ context.Context, _ string, _ []Range, _ *pool.SafeSlabPool[byte]) (hits map[Range][]byte) { - return nil -} - -func (NoopCache) StoreChunks(_ string, _ map[Range][]byte) { -} - -func NewChunksCache(logger log.Logger, client cache.Cache, reg prometheus.Registerer) (*ChunksCache, error) { - c := &ChunksCache{ - logger: logger, - cache: client, - } - - c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_bucket_store_chunks_cache_requests_total", - Help: "Total number of items requested from the cache.", - }) - - c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_bucket_store_chunks_cache_hits_total", - Help: "Total number of items retrieved from the cache.", - }) - - level.Info(logger).Log("msg", "created chunks cache") - - return c, nil -} - -func (c *ChunksCache) FetchMultiChunks(ctx context.Context, userID string, ranges []Range, chunksPool *pool.SafeSlabPool[byte]) (hits map[Range][]byte) { - keysMap := make(map[string]Range, len(ranges)) - keys := make([]string, len(ranges)) - for i, r := range ranges { - k := chunksKey(userID, r) - keysMap[k] = ranges[i] - keys[i] = k - } - - hitBytes := c.cache.Fetch(ctx, keys, cache.WithAllocator(pool.NewSafeSlabPoolAllocator(chunksPool))) - if len(hitBytes) > 0 { - hits = make(map[Range][]byte, len(hitBytes)) - for key, b := range hitBytes { - hits[keysMap[key]] = b - } - } - - c.requests.Add(float64(len(ranges))) - c.hits.Add(float64(len(hits))) - return -} - -func chunksKey(userID string, r Range) string { - return fmt.Sprintf("C:%s:%s:%d:%d", userID, r.BlockID, r.Start, r.NumChunks) -} - -const ( - defaultTTL = 7 * 24 * time.Hour -) - -func (c *ChunksCache) StoreChunks(userID string, ranges map[Range][]byte) { - rangesWithTenant := make(map[string][]byte, len(ranges)) - for r, v := range ranges { - rangesWithTenant[chunksKey(userID, r)] = v - } - c.cache.StoreAsync(rangesWithTenant, defaultTTL) -} diff --git a/pkg/storegateway/chunkscache/cache_test.go b/pkg/storegateway/chunkscache/cache_test.go deleted file mode 100644 index 0c92313b58e..00000000000 --- a/pkg/storegateway/chunkscache/cache_test.go +++ /dev/null @@ -1,173 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package chunkscache - -import ( - "context" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/cache" - "github.com/oklog/ulid" - "github.com/pkg/errors" - prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/stretchr/testify/assert" -) - -func TestDskitChunksCache_FetchMultiChunks(t *testing.T) { - t.Parallel() - - // Init some data to conveniently define test cases later one. - user1 := "tenant1" - user2 := "tenant2" - block1 := ulid.MustNew(1, nil) - block2 := ulid.MustNew(2, nil) - range1 := Range{BlockID: block1, Start: chunks.ChunkRef(100), NumChunks: 10} - range2 := Range{BlockID: block1, Start: chunks.ChunkRef(200), NumChunks: 20} - range3 := Range{BlockID: block2, Start: chunks.ChunkRef(100), NumChunks: 10} - value1 := []byte{1} - value2 := []byte{2} - value3 := []byte{3} - - tests := map[string]struct { - setup []mockedChunks - mockedErr error - fetchUserID string - fetchRanges []Range - expectedHits map[Range][]byte - }{ - "should return no hits on empty cache": { - setup: []mockedChunks{}, - fetchUserID: user1, - fetchRanges: []Range{range1, range2}, - expectedHits: nil, - }, - "should return no misses on 100% hit ratio": { - setup: []mockedChunks{ - {userID: user1, r: range1, value: value1}, - {userID: user2, r: range2, value: value2}, - {userID: user1, r: range3, value: value3}, - }, - fetchUserID: user1, - fetchRanges: []Range{range1, range3}, - expectedHits: map[Range][]byte{ - range1: value1, - range3: value3, - }, - }, - "should return hits and misses on partial hits": { - setup: []mockedChunks{ - {userID: user1, r: range1, value: value1}, - {userID: user1, r: range2, value: value2}, - }, - fetchUserID: user1, - fetchRanges: []Range{range1, range3}, - expectedHits: map[Range][]byte{range1: value1}, - }, - "should return no hits on cache error": { - setup: []mockedChunks{ - {userID: user1, r: range1, value: value1}, - {userID: user1, r: range1, value: value2}, - {userID: user1, r: range1, value: value3}, - }, - mockedErr: errors.New("mocked error"), - fetchUserID: user1, - fetchRanges: []Range{range1, range2}, - expectedHits: nil, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - cacheClient := newMockedCacheClient(testData.mockedErr) - c, err := NewChunksCache(log.NewNopLogger(), cacheClient, nil) - assert.NoError(t, err) - - // Store the postings expected before running the test. - ctx := context.Background() - toStore := make(map[string]map[Range][]byte) - for _, p := range testData.setup { - if toStore[p.userID] == nil { - toStore[p.userID] = make(map[Range][]byte) - } - toStore[p.userID][p.r] = p.value - } - for userID, userRanges := range toStore { - c.StoreChunks(userID, userRanges) - } - - // Fetch postings from cached and assert on it. - hits := c.FetchMultiChunks(ctx, testData.fetchUserID, testData.fetchRanges, nil) - assert.Equal(t, testData.expectedHits, hits) - - // Assert on metrics. - assert.Equal(t, float64(len(testData.fetchRanges)), prom_testutil.ToFloat64(c.requests)) - assert.Equal(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits)) - - }) - } -} - -func BenchmarkStringCacheKeys(b *testing.B) { - userID := "tenant" - rng := Range{BlockID: ulid.MustNew(1, nil), Start: chunks.ChunkRef(200), NumChunks: 20} - - b.Run("chunks", func(b *testing.B) { - for i := 0; i < b.N; i++ { - chunksKey(userID, rng) - } - }) - -} - -type mockedChunks struct { - userID string - r Range - value []byte -} - -type mockedCacheClient struct { - cache map[string][]byte - mockedGetMultiErr error -} - -func newMockedCacheClient(mockedGetMultiErr error) *mockedCacheClient { - return &mockedCacheClient{ - cache: map[string][]byte{}, - mockedGetMultiErr: mockedGetMultiErr, - } -} - -func (c *mockedCacheClient) Fetch(_ context.Context, keys []string, _ ...cache.Option) map[string][]byte { - if c.mockedGetMultiErr != nil { - return nil - } - - hits := map[string][]byte{} - - for _, key := range keys { - if value, ok := c.cache[key]; ok { - hits[key] = value - } - } - - return hits -} - -func (c *mockedCacheClient) StoreAsync(data map[string][]byte, _ time.Duration) { - for key, value := range data { - c.cache[key] = value - } -} - -func (c *mockedCacheClient) Delete(_ context.Context, key string) error { - delete(c.cache, key) - - return nil -} - -func (c *mockedCacheClient) Name() string { - return "mockedCacheClient" -} diff --git a/pkg/storegateway/series_chunks.go b/pkg/storegateway/series_chunks.go index 8ab9b17f53e..cbf6093a29e 100644 --- a/pkg/storegateway/series_chunks.go +++ b/pkg/storegateway/series_chunks.go @@ -4,8 +4,6 @@ package storegateway import ( "context" - "encoding/binary" - "fmt" "hash/crc32" "sync" "time" @@ -16,7 +14,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/storepb" util_math "github.com/grafana/mimir/pkg/util/math" "github.com/grafana/mimir/pkg/util/pool" @@ -187,7 +184,6 @@ func newChunksPreloadingIterator( ctx context.Context, logger log.Logger, userID string, - cache chunkscache.Cache, chunkReaders bucketChunkReaders, refsIterator seriesChunkRefsSetIterator, refsIteratorBatchSize int, @@ -195,7 +191,7 @@ func newChunksPreloadingIterator( minT, maxT int64, ) seriesChunksSetIterator { var iterator seriesChunksSetIterator - iterator = newLoadingSeriesChunksSetIterator(ctx, logger, userID, cache, chunkReaders, refsIterator, refsIteratorBatchSize, stats, minT, maxT) + iterator = newLoadingSeriesChunksSetIterator(ctx, logger, userID, chunkReaders, refsIterator, refsIteratorBatchSize, stats, minT, maxT) iterator = newPreloadingAndStatsTrackingSetIterator[seriesChunksSet](ctx, 1, iterator, stats) return iterator } @@ -335,7 +331,6 @@ type loadingSeriesChunksSetIterator struct { ctx context.Context logger log.Logger userID string - cache chunkscache.Cache chunkReaders bucketChunkReaders from seriesChunkRefsSetIterator fromBatchSize int @@ -350,7 +345,6 @@ func newLoadingSeriesChunksSetIterator( ctx context.Context, logger log.Logger, userID string, - cache chunkscache.Cache, chunkReaders bucketChunkReaders, from seriesChunkRefsSetIterator, fromBatchSize int, @@ -362,7 +356,6 @@ func newLoadingSeriesChunksSetIterator( ctx: ctx, logger: logger, userID: userID, - cache: cache, chunkReaders: chunkReaders, from: from, fromBatchSize: fromBatchSize, @@ -415,11 +408,6 @@ func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) { // so can safely expand it. nextSet.series = nextSet.series[:nextUnloaded.len()] - var cachedRanges map[chunkscache.Range][]byte - if c.cache != nil { - cachedRanges = c.cache.FetchMultiChunks(c.ctx, c.userID, toCacheKeys(nextUnloaded.series), chunksPool) - c.recordCachedChunks(cachedRanges) - } c.chunkReaders.reset() for sIdx, s := range nextUnloaded.series { nextSet.series[sIdx].lset = s.lset @@ -429,20 +417,10 @@ func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) { for _, chunksRange := range s.chunksRanges { rangeChunks := nextSet.series[sIdx].chks[seriesChunkIdx : seriesChunkIdx+len(chunksRange.refs)] initializeChunks(chunksRange, rangeChunks) - if cachedRange, ok := cachedRanges[toCacheKey(chunksRange)]; ok { - err := parseChunksRange(cachedRange, rangeChunks) - if err == nil { - seriesChunkIdx += len(chunksRange.refs) - continue - } - // we couldn't parse the chunk range form the cache, so we will fetch its chunks from the bucket. - level.Warn(c.logger).Log("msg", "parsing cache chunks", "err", err) - } for _, chunk := range chunksRange.refs { - if c.cache == nil && (chunk.minTime > c.maxTime || chunk.maxTime < c.minTime) { - // If the cache is not set, then we don't need to overfetch chunks that we know are outside minT/maxT. - // If the cache is set, then we need to do that, so we can cache the complete chunks ranges; they will be filtered out after fetching. + if chunk.minTime > c.maxTime || chunk.maxTime < c.minTime { + // We don't need to overfetch chunks that we know are outside minT/maxT. seriesChunkIdx++ continue } @@ -461,9 +439,6 @@ func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) { c.err = errors.Wrap(err, "loading chunks") return false } - if c.cache != nil { - c.storeRangesInCache(nextUnloaded.series, nextSet.series, cachedRanges) - } c.recordProcessedChunks(nextSet.series) // We might have over-fetched some chunks that were outside minT/maxT because we fetch a whole @@ -489,77 +464,6 @@ func initializeChunks(chunksRange seriesChunkRefsRange, chunks []storepb.AggrChu } } -func toCacheKeys(series []seriesChunkRefs) []chunkscache.Range { - totalRanges := 0 - for _, s := range series { - totalRanges += len(s.chunksRanges) - } - ranges := make([]chunkscache.Range, 0, totalRanges) - for _, s := range series { - for _, g := range s.chunksRanges { - ranges = append(ranges, toCacheKey(g)) - } - } - return ranges -} - -func toCacheKey(g seriesChunkRefsRange) chunkscache.Range { - return chunkscache.Range{ - BlockID: g.blockID, - Start: g.firstRef(), - NumChunks: len(g.refs), - } -} - -func parseChunksRange(rBytes []byte, chunks []storepb.AggrChunk) error { - for i := range chunks { - // ┌───────────────┬───────────────────┬──────────────┐ - // │ len │ encoding <1 byte> │ data │ - // └───────────────┴───────────────────┴──────────────┘ - chunkDataLen, n := varint.Uvarint(rBytes) - if n == 0 { - return fmt.Errorf("not enough bytes (%d) to read length of chunk %d/%d", len(rBytes), i, len(chunks)) - } - if n < 0 { - return fmt.Errorf("chunk length doesn't fit into uint64 %d/%d", i, len(chunks)) - } - totalChunkLen := n + 1 + int(chunkDataLen) - // The length was estimated, but at this point we know the exact length of the chunk, so we can set it. - if totalChunkLen > len(rBytes) { - return fmt.Errorf("malformed cached chunk range") - } - encodingByte := rBytes[n] - enc, ok := convertChunkEncoding(encodingByte) - if !ok { - return fmt.Errorf("unknown chunk encoding (%d)", encodingByte) - } - chunks[i].Raw.Type = enc - chunks[i].Raw.Data = rBytes[n+1 : totalChunkLen] - rBytes = rBytes[totalChunkLen:] - } - return nil -} - -func convertChunkEncoding(storageEncoding byte) (storepb.Chunk_Encoding, bool) { - converted := storepb.Chunk_Encoding(storageEncoding) - _, exists := storepb.Chunk_Encoding_name[int32(converted)] - return converted, exists -} - -func (c *loadingSeriesChunksSetIterator) recordCachedChunks(cachedRanges map[chunkscache.Range][]byte) { - fetchedChunks := 0 - fetchedBytes := 0 - for k, b := range cachedRanges { - fetchedChunks += k.NumChunks - fetchedBytes += len(b) - } - - c.stats.update(func(stats *queryStats) { - stats.chunksFetched += fetchedChunks - stats.chunksFetchedSizeSum += fetchedBytes - }) -} - func removeChunksOutsideRange(chks []storepb.AggrChunk, minT, maxT int64) []storepb.AggrChunk { writeIdx := 0 for i, chk := range chks { @@ -583,52 +487,6 @@ func (c *loadingSeriesChunksSetIterator) Err() error { return c.err } -func encodeChunksForCache(chunks []storepb.AggrChunk) []byte { - encodedSize := 0 - for _, chk := range chunks { - dataLen := len(chk.Raw.Data) - encodedSize += varint.UvarintSize(uint64(dataLen)) + 1 + dataLen - } - encoded := make([]byte, 0, encodedSize) - for _, chk := range chunks { - encoded = binary.AppendUvarint(encoded, uint64(len(chk.Raw.Data))) - // The cast to byte() below is safe because the actual type of the chunk in the TSDB is a single byte, - // so the type in our protos shouldn't take more than 1 byte. - encoded = append(encoded, byte(chk.Raw.Type)) - encoded = append(encoded, chk.Raw.Data...) - } - return encoded -} - -func (c *loadingSeriesChunksSetIterator) storeRangesInCache(seriesRefs []seriesChunkRefs, seriesChunks []seriesChunks, cacheHits map[chunkscache.Range][]byte) { - // Count the number of ranges that were not previously cached, and so we need to store to the cache. - cacheMisses := 0 - for _, s := range seriesRefs { - for _, chunksRange := range s.chunksRanges { - if _, ok := cacheHits[toCacheKey(chunksRange)]; !ok { - cacheMisses++ - } - } - } - - toStore := make(map[chunkscache.Range][]byte, cacheMisses) - for sIdx, s := range seriesRefs { - seriesChunkIdx := 0 - for _, chunksRange := range s.chunksRanges { - cacheKey := toCacheKey(chunksRange) - if _, ok := cacheHits[cacheKey]; ok { - seriesChunkIdx += len(chunksRange.refs) - continue - } - rangeChunks := seriesChunks[sIdx].chks[seriesChunkIdx : seriesChunkIdx+len(chunksRange.refs)] - toStore[cacheKey] = encodeChunksForCache(rangeChunks) - - seriesChunkIdx += len(chunksRange.refs) - } - } - c.cache.StoreChunks(c.userID, toStore) -} - func (c *loadingSeriesChunksSetIterator) recordReturnedChunks(series []seriesChunks) { returnedChunks, returnedChunksBytes := chunkStats(series) diff --git a/pkg/storegateway/series_chunks_test.go b/pkg/storegateway/series_chunks_test.go index 0bc2e308fc3..0d22cd07af8 100644 --- a/pkg/storegateway/series_chunks_test.go +++ b/pkg/storegateway/series_chunks_test.go @@ -20,7 +20,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/test" @@ -541,25 +540,6 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) { series: generateSeriesEntries(t, 10), } - // sliceBlock slices the chunks and chunk refs for each series into totalParts and returns only the requested parts. - // fromPart is inclusive, toPart is not inclusive; fromPart and toPart are zero-indexed. - sliceBlock := func(b testBlock, fromPart, toPart, totalParts int) testBlock { - t.Helper() - require.Zero(t, len(b.series)%totalParts, "cannot divide block into uneven parts") - - series := make([]testBlockSeries, len(b.series)) - copy(series, block1.series) - chunksPerPart := len(b.series[0].refs) / totalParts - for sIdx, s := range series { - series[sIdx].refs = s.refs[chunksPerPart*fromPart : chunksPerPart*toPart] - series[sIdx].chks = s.chks[chunksPerPart*fromPart : chunksPerPart*toPart] - } - return testBlock{ - ulid: b.ulid, - series: series, - } - } - block2 := testBlock{ ulid: ulid.MustNew(2, nil), series: generateSeriesEntries(t, 20)[10:], // Make block2 contain different 10 series from those in block1 @@ -756,174 +736,10 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) { expectedErr: "test err", }, }, - "only cache misses": { - // Load series 0 and 2 - { - existingBlocks: []testBlock{block1, block2}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(0), block1.toSeriesChunkRefs(2)}}, - {series: []seriesChunkRefs{block2.toSeriesChunkRefs(0), block2.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - {series: []seriesChunks{block2.seriesChunks(0), block2.seriesChunks(2)}}, - }, - }, - // Next load a different set of series (1 and 3) - { - existingBlocks: []testBlock{block1, block2}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(1), block1.toSeriesChunkRefs(3)}}, - {series: []seriesChunkRefs{block2.toSeriesChunkRefs(1), block2.toSeriesChunkRefs(3)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(1), block1.seriesChunks(3)}}, - {series: []seriesChunks{block2.seriesChunks(1), block2.seriesChunks(3)}}, - }, - }, - }, - "skips cached chunks when there is a different number of chunks in the range": { - // Issue a request where the first series has its chunks only in two ranges - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 2), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - // Issue a request where the first series has its chunks in 12 ranges; it shouldn't interfere with the cache item from last request - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 12), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - }, - "cache hits": { - // Issue a request - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 1), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - // Issue the same request; this time with an empty storage - { - existingBlocks: []testBlock{}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(0), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - }, - "one block cache hit, one block cache misses": { - // First query only from block1 - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(0), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - // Next query from block1 and block2 with only block2 available in the storage; chunks for block1 should be already cached - { - existingBlocks: []testBlock{block2}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 1), block1.toSeriesChunkRefs(2)}}, - {series: []seriesChunkRefs{block2.toSeriesChunkRefsWithNRanges(0, 1), block2.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - {series: []seriesChunks{block2.seriesChunks(0), block2.seriesChunks(2)}}, - }, - }, - }, - "some chunk range cache hit, some cache miss": { - // Load the chunks of series 0 loading only the chunks at the end of the block range. - // The chunks in the whole block cover time 0 to 500 for the first series. - { - existingBlocks: []testBlock{block1}, - minT: 301, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRangesOverlapping(0, 5, 301, 500)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.toSeriesChunksOverlapping(0, 301, 500)}}, - }, - }, - // Next query a wide time range, but make only the first half of chunks available in the store; - // If the cache is used, the loading iterator will read the second half of chunks from it. - { - existingBlocks: []testBlock{sliceBlock(block1, 0, 3, 5)}, - minT: 0, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 5)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0)}}, - }, - }, - }, - "after partial cache hits, cache is populated": { - // The chunks in the whole block cover time 0 to 500 for the first series. - // We make available a block which covers only the first two fifths of the block. - // Cache the first two fifths of chunks. - { - existingBlocks: []testBlock{sliceBlock(block1, 0, 2, 5)}, - minT: 0, - maxT: 199, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRangesOverlapping(0, 5, 0, 199)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.toSeriesChunksOverlapping(0, 0, 199)}}, - }, - }, - // Next query a wide time range; second fifth of chunks should come from the cache, the rest from the bucket. - { - existingBlocks: []testBlock{sliceBlock(block1, 2, 5, 5)}, - minT: 100, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRangesOverlapping(0, 5, 100, 500)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.toSeriesChunksOverlapping(0, 100, 500)}}, - }, - }, - // Query the same time range, this time all four fifths of chunks should be served form the cache. - { - existingBlocks: []testBlock{}, - minT: 0, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 5)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0)}}, - }, - }, - }, } for testName, loadRequests := range testCases { t.Run(testName, func(t *testing.T) { - // Reuse the cache between requests, so we can test caching too - chunksCache := newInMemoryChunksCache() for scenarioIdx, testCase := range loadRequests { t.Run("step "+strconv.Itoa(scenarioIdx), func(t *testing.T) { @@ -944,7 +760,7 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) { } // Run test - set := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", chunksCache, *readers, newSliceSeriesChunkRefsSetIterator(nil, testCase.setsToLoad...), 100, newSafeQueryStats(), minT, maxT) + set := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", *readers, newSliceSeriesChunkRefsSetIterator(nil, testCase.setsToLoad...), 100, newSafeQueryStats(), minT, maxT) loadedSets := readAllSeriesChunksSets(set) // Assertions @@ -1025,7 +841,7 @@ func BenchmarkLoadingSeriesChunksSetIterator(b *testing.B) { for n := 0; n < b.N; n++ { batchSize := numSeriesPerSet - it := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", newInMemoryChunksCache(), *chunkReaders, newSliceSeriesChunkRefsSetIterator(nil, sets...), batchSize, stats, 0, 10000) + it := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", *chunkReaders, newSliceSeriesChunkRefsSetIterator(nil, sets...), batchSize, stats, 0, 10000) actualSeries := 0 actualChunks := 0 @@ -1057,28 +873,6 @@ func BenchmarkLoadingSeriesChunksSetIterator(b *testing.B) { } } -func BenchmarkEncodeChunksForCache(b *testing.B) { - testCases := map[string]struct { - toEncode []storepb.AggrChunk - }{ - "1 small chunk": {generateSeriesEntriesWithChunksSize(b, 1, 1, 32)[0].chks}, - "1 medium chunk": {generateSeriesEntriesWithChunksSize(b, 1, 1, 256)[0].chks}, - "1 big chunk": {generateSeriesEntriesWithChunksSize(b, 1, 1, 4096)[0].chks}, - "50 small chunks": {generateSeriesEntriesWithChunksSize(b, 1, 50, 32)[0].chks}, - "50 medium chunks": {generateSeriesEntriesWithChunksSize(b, 1, 50, 256)[0].chks}, - "50 big chunks": {generateSeriesEntriesWithChunksSize(b, 1, 50, 4096)[0].chks}, - } - - for testName, testCase := range testCases { - b.Run(testName, func(b *testing.B) { - for i := 0; i < b.N; i++ { - encoded := encodeChunksForCache(testCase.toEncode) - assert.NotEmpty(b, encoded) - } - }) - } -} - type chunkReaderMock struct { chunks map[chunks.ChunkRef]storepb.AggrChunk addLoadErr, loadErr error @@ -1294,33 +1088,3 @@ func readAllSeriesLabels(it storepb.SeriesSet) []labels.Labels { } return out } - -type inMemoryChunksCache struct { - cached map[string]map[chunkscache.Range][]byte -} - -func newInMemoryChunksCache() chunkscache.Cache { - return &inMemoryChunksCache{ - cached: map[string]map[chunkscache.Range][]byte{}, - } -} - -func (c *inMemoryChunksCache) FetchMultiChunks(_ context.Context, userID string, ranges []chunkscache.Range, _ *pool.SafeSlabPool[byte]) map[chunkscache.Range][]byte { - hits := make(map[chunkscache.Range][]byte, len(ranges)) - for _, r := range ranges { - if cached, ok := c.cached[userID][r]; ok { - hits[r] = cached - } - } - return hits -} - -func (c *inMemoryChunksCache) StoreChunks(userID string, ranges map[chunkscache.Range][]byte) { - if c.cached[userID] == nil { - c.cached[userID] = make(map[chunkscache.Range][]byte) - } - - for k, v := range ranges { - c.cached[userID][k] = v - } -} diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index f1459a67ab3..40b3a4fb257 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -711,19 +711,18 @@ func (l *limitingSeriesChunkRefsSetIterator) Err() error { } type loadingSeriesChunkRefsSetIterator struct { - ctx context.Context - postingsSetIterator *postingsSetsIterator - indexr *bucketIndexReader - indexCache indexcache.IndexCache - stats *safeQueryStats - blockID ulid.ULID - shard *sharding.ShardSelector - seriesHasher seriesHasher - strategy seriesIteratorStrategy - minTime, maxTime int64 - tenantID string - chunkRangesPerSeries int - logger log.Logger + ctx context.Context + postingsSetIterator *postingsSetsIterator + indexr *bucketIndexReader + indexCache indexcache.IndexCache + stats *safeQueryStats + blockID ulid.ULID + shard *sharding.ShardSelector + seriesHasher seriesHasher + strategy seriesIteratorStrategy + minTime, maxTime int64 + tenantID string + logger log.Logger chunkMetasBuffer []chunks.Meta @@ -743,7 +742,6 @@ func openBlockSeriesChunkRefsSetsIterator( seriesHasher seriesHasher, strategy seriesIteratorStrategy, minTime, maxTime int64, // Series must have data in this time range to be returned (ignored if skipChunks=true). - chunkRangesPerSeries int, stats *safeQueryStats, reuse *reusedPostingsAndMatchers, // If this is not nil, these posting and matchers are used as it is without fetching new ones. logger log.Logger, @@ -787,7 +785,6 @@ func openBlockSeriesChunkRefsSetsIterator( minTime, maxTime, tenantID, - chunkRangesPerSeries, logger, ) if len(pendingMatchers) > 0 { @@ -877,7 +874,6 @@ func newLoadingSeriesChunkRefsSetIterator( minTime int64, maxTime int64, tenantID string, - chunkRangesPerSeries int, logger log.Logger, ) *loadingSeriesChunkRefsSetIterator { if strategy.isNoChunkRefsOnEntireBlock() { @@ -885,20 +881,19 @@ func newLoadingSeriesChunkRefsSetIterator( } return &loadingSeriesChunkRefsSetIterator{ - ctx: ctx, - postingsSetIterator: postingsSetIterator, - indexr: indexr, - indexCache: indexCache, - stats: stats, - blockID: blockMeta.ULID, - shard: shard, - seriesHasher: seriesHasher, - strategy: strategy, - minTime: minTime, - maxTime: maxTime, - tenantID: tenantID, - logger: logger, - chunkRangesPerSeries: chunkRangesPerSeries, + ctx: ctx, + postingsSetIterator: postingsSetIterator, + indexr: indexr, + indexCache: indexCache, + stats: stats, + blockID: blockMeta.ULID, + shard: shard, + seriesHasher: seriesHasher, + strategy: strategy, + minTime: minTime, + maxTime: maxTime, + tenantID: tenantID, + logger: logger, } } @@ -1017,7 +1012,7 @@ func (s *loadingSeriesChunkRefsSetIterator) symbolizedSet(ctx context.Context, p } case !s.strategy.isNoChunkRefs(): clampLastChunkLength(symbolizedSet.series, metas) - series.chunksRanges = metasToRanges(partitionChunks(metas, s.chunkRangesPerSeries, minChunksPerRange), s.blockID, s.minTime, s.maxTime) + series.chunksRanges = metasToRanges([][]chunks.Meta{metas}, s.blockID, s.minTime, s.maxTime) } symbolizedSet.series = append(symbolizedSet.series, series) } @@ -1093,10 +1088,6 @@ func (s *loadingSeriesChunkRefsSetIterator) filterSeries(set seriesChunkRefsSet, return set } -const ( - minChunksPerRange = 10 -) - // partitionChunks creates a slice of []chunks.Meta for each range of chunks within the same segment file. // The partitioning here should be fairly static and not depend on the actual Series() request because // the resulting ranges may be used for caching, and we want our cache entries to be reusable between requests. diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index d5ed99f1307..3d814db0f74 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -1461,7 +1461,6 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) { tc.minT, tc.maxT, "t1", - 1, log.NewNopLogger(), ) @@ -1702,54 +1701,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator(t *testing.T) { }}, }, }, - "partitions multiple chunks into 2 ranges": { - matcher: labels.MustNewMatcher(labels.MatchRegexp, "a", "3"), - batchSize: 1, - expectedSeries: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{ - {lset: labels.FromStrings("a", "3", "b", "1"), chunksRanges: []seriesChunkRefsRange{ - {refs: []seriesChunkRef{ - {segFileOffset: 358, length: 49, minTime: 0, maxTime: 124}, - {segFileOffset: 407, length: 50, minTime: 125, maxTime: 249}, - {segFileOffset: 457, length: 50, minTime: 250, maxTime: 374}, - {segFileOffset: 507, length: 50, minTime: 375, maxTime: 499}, - {segFileOffset: 557, length: 50, minTime: 500, maxTime: 624}, - {segFileOffset: 607, length: 50, minTime: 625, maxTime: 749}, - {segFileOffset: 657, length: 50, minTime: 750, maxTime: 874}, - {segFileOffset: 707, length: 50, minTime: 875, maxTime: 999}, - {segFileOffset: 757, length: 50, minTime: 1000, maxTime: 1124}, - {segFileOffset: 807, length: 50, minTime: 1125, maxTime: 1249}, - }}, - {refs: []seriesChunkRef{ - {segFileOffset: 857, length: 50, minTime: 1250, maxTime: 1374}, - {segFileOffset: 907, length: 50, minTime: 1375, maxTime: 1499}, - {segFileOffset: 957, length: tsdb.EstimatedMaxChunkSize, minTime: 1500, maxTime: 1559}, - }}, - }}, - }}, - {series: []seriesChunkRefs{ - {lset: labels.FromStrings("a", "3", "b", "2"), chunksRanges: []seriesChunkRefsRange{ - {refs: []seriesChunkRef{ - {segFileOffset: 991, length: 49, minTime: 0, maxTime: 124}, - {segFileOffset: 1040, length: 50, minTime: 125, maxTime: 249}, - {segFileOffset: 1090, length: 50, minTime: 250, maxTime: 374}, - {segFileOffset: 1140, length: 50, minTime: 375, maxTime: 499}, - {segFileOffset: 1190, length: 50, minTime: 500, maxTime: 624}, - {segFileOffset: 1240, length: 50, minTime: 625, maxTime: 749}, - {segFileOffset: 1290, length: 50, minTime: 750, maxTime: 874}, - {segFileOffset: 1340, length: 50, minTime: 875, maxTime: 999}, - {segFileOffset: 1390, length: 50, minTime: 1000, maxTime: 1124}, - {segFileOffset: 1440, length: 50, minTime: 1125, maxTime: 1249}, - }}, - {refs: []seriesChunkRef{ - {segFileOffset: 1490, length: 50, minTime: 1250, maxTime: 1374}, - {segFileOffset: 1540, length: 50, minTime: 1375, maxTime: 1499}, - {segFileOffset: 1590, length: tsdb.EstimatedMaxChunkSize, minTime: 1500, maxTime: 1559}, - }}, - }}, - }}, - }, - }, "doesn't return a series if its chunks are around minT/maxT but not within it": { matcher: labels.MustNewMatcher(labels.MatchRegexp, "a", "4"), minT: 500, maxT: 600, // The chunks for this timeseries are between 0 and 99 and 1000 and 1099 @@ -1796,7 +1747,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator(t *testing.T) { strategy, minT, maxT, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), @@ -1898,7 +1848,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_pendingMatchers(t *testing.T) { noChunkRefs, // skip chunks since we are testing labels filtering block.meta.MinTime, block.meta.MaxTime, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), @@ -1963,7 +1912,6 @@ func BenchmarkOpenBlockSeriesChunkRefsSetsIterator(b *testing.B) { defaultStrategy, // we don't skip chunks, so we can measure impact in loading chunk refs too block.meta.MinTime, block.meta.MaxTime, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), @@ -2512,7 +2460,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { noChunkRefs, b.meta.MinTime, b.meta.MaxTime, - 1, statsColdCache, nil, log.NewNopLogger(), @@ -2544,7 +2491,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { noChunkRefs, b.meta.MinTime, b.meta.MaxTime, - 1, statsWarnCache, nil, log.NewNopLogger(),