From 1f4ae2996ade9ec7ed5bd1096b1578da3f9e601a Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Sat, 19 Aug 2023 14:03:17 +0300 Subject: [PATCH 1/9] store-gateway: remove fine-grained chunks caching Signed-off-by: Dimitar Dimitrov --- cmd/mimir/config-descriptor.json | 22 -- cmd/mimir/help-all.txt.tmpl | 4 - .../configuration-parameters/index.md | 13 - pkg/storage/tsdb/caching_config.go | 14 +- pkg/storage/tsdb/config.go | 2 - pkg/storegateway/bucket.go | 59 +---- pkg/storegateway/bucket_e2e_test.go | 39 +-- pkg/storegateway/bucket_stores.go | 11 - pkg/storegateway/bucket_test.go | 8 +- pkg/storegateway/chunkscache/cache.go | 147 ----------- pkg/storegateway/chunkscache/cache_test.go | 173 ------------- pkg/storegateway/series_chunks.go | 148 +---------- pkg/storegateway/series_chunks_test.go | 240 +----------------- 13 files changed, 22 insertions(+), 858 deletions(-) delete mode 100644 pkg/storegateway/chunkscache/cache.go delete mode 100644 pkg/storegateway/chunkscache/cache_test.go diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 7fec86adf1..1e945a2910 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6775,17 +6775,6 @@ "fieldFlag": "blocks-storage.bucket-store.chunks-cache.subrange-ttl", "fieldType": "duration", "fieldCategory": "advanced" - }, - { - "kind": "field", - "name": "fine_grained_chunks_caching_enabled", - "required": false, - "desc": "Enable fine-grained caching of chunks in the store-gateway. This reduces the required bandwidth and memory utilization.", - "fieldValue": null, - "fieldDefaultValue": false, - "fieldFlag": "blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled", - "fieldType": "boolean", - "fieldCategory": "experimental" } ], "fieldValue": null, @@ -7669,17 +7658,6 @@ "fieldType": "int", "fieldCategory": "advanced" }, - { - "kind": "field", - "name": "fine_grained_chunks_caching_ranges_per_series", - "required": false, - "desc": "This option controls into how many ranges the chunks of each series from each block are split. This value is effectively the number of chunks cache items per series per block when -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled is enabled.", - "fieldValue": null, - "fieldDefaultValue": 1, - "fieldFlag": "blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series", - "fieldType": "int", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "series_selection_strategy", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index fde19c345f..22803f224d 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -307,8 +307,6 @@ Usage of ./cmd/mimir/mimir: TTL for caching object attributes for chunks. If the metadata cache is configured, attributes will be stored under this cache backend, otherwise attributes are stored in the chunks cache backend. (default 168h0m0s) -blocks-storage.bucket-store.chunks-cache.backend string Backend for chunks cache, if not empty. Supported values: memcached, redis. - -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled - [experimental] Enable fine-grained caching of chunks in the store-gateway. This reduces the required bandwidth and memory utilization. -blocks-storage.bucket-store.chunks-cache.max-get-range-requests int Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests. (default 3) -blocks-storage.bucket-store.chunks-cache.memcached.addresses comma-separated-list-of-strings @@ -401,8 +399,6 @@ Usage of ./cmd/mimir/mimir: Client write timeout. (default 3s) -blocks-storage.bucket-store.chunks-cache.subrange-ttl duration TTL for caching individual chunks subranges. (default 24h0m0s) - -blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series int - [experimental] This option controls into how many ranges the chunks of each series from each block are split. This value is effectively the number of chunks cache items per series per block when -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled is enabled. (default 1) -blocks-storage.bucket-store.ignore-blocks-within duration Blocks with minimum time within this duration are ignored, and not loaded by store-gateway. Useful when used together with -querier.query-store-after to prevent loading young blocks, because there are usually many of them (depending on number of ingesters) and they are not yet compacted. Negative values or 0 disable the filter. (default 10h0m0s) -blocks-storage.bucket-store.ignore-deletion-marks-delay duration diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index 91731718e8..c9361b2bf1 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -3270,11 +3270,6 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-ttl [subrange_ttl: | default = 24h] - # (experimental) Enable fine-grained caching of chunks in the store-gateway. - # This reduces the required bandwidth and memory utilization. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled - [fine_grained_chunks_caching_enabled: | default = false] - metadata_cache: # Backend for metadata cache, if not empty. Supported values: memcached, # redis. @@ -3461,14 +3456,6 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.batch-series-size [streaming_series_batch_size: | default = 5000] - # (experimental) This option controls into how many ranges the chunks of each - # series from each block are split. This value is effectively the number of - # chunks cache items per series per block when - # -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled - # is enabled. - # CLI flag: -blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series - [fine_grained_chunks_caching_ranges_per_series: | default = 1] - # (experimental) This option controls the strategy to selection of series and # deferring application of matchers. A more aggressive strategy will fetch # less posting lists at the cost of more series. This is useful when querying diff --git a/pkg/storage/tsdb/caching_config.go b/pkg/storage/tsdb/caching_config.go index 4c170647a7..45e194d0f1 100644 --- a/pkg/storage/tsdb/caching_config.go +++ b/pkg/storage/tsdb/caching_config.go @@ -33,11 +33,10 @@ var supportedCacheBackends = []string{cache.BackendMemcached, cache.BackendRedis type ChunksCacheConfig struct { cache.BackendConfig `yaml:",inline"` - MaxGetRangeRequests int `yaml:"max_get_range_requests" category:"advanced"` - AttributesTTL time.Duration `yaml:"attributes_ttl" category:"advanced"` - AttributesInMemoryMaxItems int `yaml:"attributes_in_memory_max_items" category:"advanced"` - SubrangeTTL time.Duration `yaml:"subrange_ttl" category:"advanced"` - FineGrainedChunksCachingEnabled bool `yaml:"fine_grained_chunks_caching_enabled" category:"experimental"` + MaxGetRangeRequests int `yaml:"max_get_range_requests" category:"advanced"` + AttributesTTL time.Duration `yaml:"attributes_ttl" category:"advanced"` + AttributesInMemoryMaxItems int `yaml:"attributes_in_memory_max_items" category:"advanced"` + SubrangeTTL time.Duration `yaml:"subrange_ttl" category:"advanced"` } func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { @@ -50,7 +49,6 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks. If the metadata cache is configured, attributes will be stored under this cache backend, otherwise attributes are stored in the chunks cache backend.") f.IntVar(&cfg.AttributesInMemoryMaxItems, prefix+"attributes-in-memory-max-items", 50000, "Maximum number of object attribute items to keep in a first level in-memory LRU cache. Metadata will be stored and fetched in-memory before hitting the cache backend. 0 to disable the in-memory cache.") f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.") - f.BoolVar(&cfg.FineGrainedChunksCachingEnabled, prefix+"fine-grained-chunks-caching-enabled", false, "Enable fine-grained caching of chunks in the store-gateway. This reduces the required bandwidth and memory utilization.") } func (cfg *ChunksCacheConfig) Validate() error { @@ -138,9 +136,7 @@ func CreateCachingBucket(chunksCache cache.Cache, chunksConfig ChunksCacheConfig return nil, errors.Wrapf(err, "wrap metadata cache with in-memory cache") } } - if !chunksConfig.FineGrainedChunksCachingEnabled { - cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, subrangeSize, attributesCache, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) - } + cfg.CacheGetRange("chunks", chunksCache, isTSDBChunkFile, subrangeSize, attributesCache, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } if !cachingConfigured { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2589fadcf1..07b39b85bb 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -412,7 +412,6 @@ type BucketStoreConfig struct { IndexHeader indexheader.Config `yaml:"index_header" category:"experimental"` StreamingBatchSize int `yaml:"streaming_series_batch_size" category:"advanced"` - ChunkRangesPerSeries int `yaml:"fine_grained_chunks_caching_ranges_per_series" category:"experimental"` SeriesSelectionStrategyName string `yaml:"series_selection_strategy" category:"experimental"` SelectionStrategies struct { WorstCaseSeriesPreference float64 `yaml:"worst_case_series_preference" category:"experimental"` @@ -461,7 +460,6 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.IndexHeaderSparsePersistenceEnabled, "blocks-storage.bucket-store.index-header-sparse-persistence-enabled", false, "If enabled, store-gateway will persist a sparse version of the index-header to disk on construction and load sparse index-headers from disk instead of the whole index-header.") f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", DefaultPartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.") f.IntVar(&cfg.StreamingBatchSize, "blocks-storage.bucket-store.batch-series-size", 5000, "This option controls how many series to fetch per batch. The batch size must be greater than 0.") - f.IntVar(&cfg.ChunkRangesPerSeries, "blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series", 1, "This option controls into how many ranges the chunks of each series from each block are split. This value is effectively the number of chunks cache items per series per block when -blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled is enabled.") f.StringVar(&cfg.SeriesSelectionStrategyName, seriesSelectionStrategyFlag, WorstCasePostingsStrategy, "This option controls the strategy to selection of series and deferring application of matchers. A more aggressive strategy will fetch less posting lists at the cost of more series. This is useful when querying large blocks in which many series share the same label name and value. Supported values (most aggressive to least aggressive): "+strings.Join(validSeriesSelectionStrategies, ", ")+".") f.Float64Var(&cfg.SelectionStrategies.WorstCaseSeriesPreference, "blocks-storage.bucket-store.series-selection-strategies.worst-case-series-preference", 0.75, "This option is only used when "+seriesSelectionStrategyFlag+"="+WorstCasePostingsStrategy+". Increasing the series preference results in fetching more series than postings. Must be a positive floating point number.") } diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 891acc9c96..fc8a2a4a19 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -45,7 +45,6 @@ import ( "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" @@ -92,7 +91,6 @@ type BucketStore struct { fetcher block.MetadataFetcher dir string indexCache indexcache.IndexCache - chunksCache chunkscache.Cache indexReaderPool *indexheader.ReaderPool seriesHashCache *hashcache.SeriesHashCache @@ -109,14 +107,6 @@ type BucketStore struct { // This value must be greater than zero. maxSeriesPerBatch int - // numChunksRangesPerSeries controls into how many ranges the chunks of each series from each block are split. - // This value is effectively the number of chunks cache items per series per block. - numChunksRangesPerSeries int - - // fineGrainedChunksCachingEnabled controls whether to use the per series chunks caching - // or rely on the transparent caching bucket. - fineGrainedChunksCachingEnabled bool - // Query gate which limits the maximum amount of concurrent queries. queryGate gate.Gate @@ -194,13 +184,6 @@ func WithIndexCache(cache indexcache.IndexCache) BucketStoreOption { } } -// WithChunksCache sets a chunksCache to use instead of a noopCache. -func WithChunksCache(cache chunkscache.Cache) BucketStoreOption { - return func(s *BucketStore) { - s.chunksCache = cache - } -} - // WithQueryGate sets a queryGate to use instead of a gate.NewNoop(). func WithQueryGate(queryGate gate.Gate) BucketStoreOption { return func(s *BucketStore) { @@ -215,12 +198,6 @@ func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption { } } -func WithFineGrainedChunksCaching(enabled bool) BucketStoreOption { - return func(s *BucketStore) { - s.fineGrainedChunksCachingEnabled = enabled - } -} - // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -243,7 +220,6 @@ func NewBucketStore( fetcher: fetcher, dir: dir, indexCache: noopCache{}, - chunksCache: chunkscache.NoopCache{}, blocks: map[ulid.ULID]*bucketBlock{}, blockSet: newBucketBlockSet(), blockSyncConcurrency: bucketStoreConfig.BlockSyncConcurrency, @@ -258,7 +234,6 @@ func NewBucketStore( metrics: metrics, userID: userID, maxSeriesPerBatch: bucketStoreConfig.StreamingBatchSize, - numChunksRangesPerSeries: bucketStoreConfig.ChunkRangesPerSeries, postingsStrategy: postingsStrategy, } @@ -997,11 +972,7 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks( var set storepb.SeriesSet if !req.SkipChunks { - var cache chunkscache.Cache - if s.fineGrainedChunksCachingEnabled { - cache = s.chunksCache - } - ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, cache, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) + ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) set = newSeriesChunksSeriesSet(ss) } else { set = newSeriesSetWithoutChunks(ctx, it, stats) @@ -1057,12 +1028,7 @@ func (s *BucketStore) streamingChunksSetForBlocks( if err != nil { return nil, err } - - var cache chunkscache.Cache - if s.fineGrainedChunksCachingEnabled { - cache = s.chunksCache - } - scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, cache, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) + scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats, req.MinTime, req.MaxTime) return scsi, nil } @@ -1115,7 +1081,7 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( cachedSeriesHasher{blockSeriesHashCache}, strategy, req.MinTime, req.MaxTime, - s.numChunksRangesPerSeries, + 1, stats, r, s.logger, @@ -1170,20 +1136,11 @@ func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) { s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) - if s.fineGrainedChunksCachingEnabled { - s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksProcessed)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksProcessedSizeSum)) - // With fine-grained caching we may have touched more chunks than we need because we had to fetch a - // whole range of chunks, which includes chunks outside the request's minT/maxT. - s.metrics.seriesDataTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksReturned)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksReturnedSizeSum)) - } else { - s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouchedSizeSum)) - // For the implementation which uses the caching bucket the bytes we touch are the bytes we return. - s.metrics.seriesDataTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouchedSizeSum)) - } + s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouchedSizeSum)) + // For the implementation which uses the caching bucket the bytes we touch are the bytes we return. + s.metrics.seriesDataTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks", "returned").Observe(float64(stats.chunksTouchedSizeSum)) s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) } diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 452edfd788..6db76700c2 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" "github.com/grafana/mimir/pkg/storegateway/storepb" @@ -46,17 +45,12 @@ var ( type swappableCache struct { indexcache.IndexCache - chunkscache.Cache } func (c *swappableCache) SwapIndexCacheWith(cache indexcache.IndexCache) { c.IndexCache = cache } -func (c *swappableCache) SwapChunksCacheWith(cache chunkscache.Cache) { - c.Cache = cache -} - type storeSuite struct { store *BucketStore minTime, maxTime int64 @@ -122,7 +116,6 @@ type prepareStoreConfig struct { seriesLimiterFactory SeriesLimiterFactory series []labels.Labels indexCache indexcache.IndexCache - chunksCache chunkscache.Cache metricsRegistry *prometheus.Registry postingsStrategy postingsSelectionStrategy // When nonOverlappingBlocks is false, prepare store creates 2 blocks per block range. @@ -151,7 +144,6 @@ func defaultPrepareStoreConfig(t testing.TB) *prepareStoreConfig { chunksLimiterFactory: newStaticChunksLimiterFactory(0), indexCache: noopCache{}, postingsStrategy: selectAllStrategy{}, - chunksCache: chunkscache.NoopCache{}, series: []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -181,7 +173,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS s := &storeSuite{ logger: log.NewNopLogger(), metricsRegistry: cfg.metricsRegistry, - cache: &swappableCache{IndexCache: cfg.indexCache, Cache: cfg.chunksCache}, + cache: &swappableCache{IndexCache: cfg.indexCache}, minTime: minTime, maxTime: maxTime, } @@ -190,7 +182,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS assert.NoError(t, err) // Have our options in the beginning so tests can override logger and index cache if they need to - storeOpts := []BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache), WithChunksCache(s.cache)} + storeOpts := []BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache)} store, err := NewBucketStore( "tenant", @@ -199,7 +191,6 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS cfg.tempDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: cfg.maxSeriesPerBatch, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 20, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -499,7 +490,6 @@ func TestBucketStore_e2e(t *testing.T) { if ok := t.Run("no caches", func(t *testing.T) { s.cache.SwapIndexCacheWith(noopCache{}) - s.cache.SwapChunksCacheWith(chunkscache.NoopCache{}) testBucketStore_e2e(t, ctx, s) }); !ok { return @@ -528,18 +518,6 @@ func TestBucketStore_e2e(t *testing.T) { }); !ok { return } - - t.Run("with large, sufficient index cache, and chunks cache", func(t *testing.T) { - indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ - MaxItemSize: 1e5, - MaxSize: 2e5, - }) - assert.NoError(t, err) - assert.NoError(t, err) - s.cache.SwapIndexCacheWith(indexCache) - s.cache.SwapChunksCacheWith(newInMemoryChunksCache()) - testBucketStore_e2e(t, ctx, s) - }) }) } @@ -577,7 +555,6 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { if ok := t.Run("no caches", func(t *testing.T) { s.cache.SwapIndexCacheWith(noopCache{}) - s.cache.SwapChunksCacheWith(chunkscache.NoopCache{}) testBucketStore_e2e(t, ctx, s, additionalCases...) }); !ok { return @@ -606,18 +583,6 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { }); !ok { return } - - t.Run("with large, sufficient index cache, and chunks cache", func(t *testing.T) { - indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ - MaxItemSize: 1e5, - MaxSize: 2e5, - }) - assert.NoError(t, err) - assert.NoError(t, err) - s.cache.SwapIndexCacheWith(indexCache) - s.cache.SwapChunksCacheWith(newInMemoryChunksCache()) - testBucketStore_e2e(t, ctx, s) - }) }) } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 2e3bde2d12..c9d9e44149 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -29,7 +29,6 @@ import ( "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/storepb" util_log "github.com/grafana/mimir/pkg/util/log" @@ -55,8 +54,6 @@ type BucketStores struct { // Index cache shared across all tenants. indexCache indexcache.IndexCache - chunksCache chunkscache.Cache - // Series hash cache shared across all tenants. seriesHashCache *hashcache.SeriesHashCache @@ -157,12 +154,6 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra return nil, errors.Wrap(err, "create index cache") } - chunksCache, err := chunkscache.NewChunksCache(logger, chunksCacheClient, reg) - if err != nil { - return nil, errors.Wrap(err, "create chunks cache") - } - u.chunksCache = chunkscache.NewTracingCache(chunksCache, logger) - if reg != nil { reg.MustRegister(u.metaFetcherMetrics) } @@ -471,10 +462,8 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { bucketStoreOpts := []BucketStoreOption{ WithLogger(userLogger), WithIndexCache(u.indexCache), - WithChunksCache(u.chunksCache), WithQueryGate(u.queryGate), WithLazyLoadingGate(u.lazyLoadingGate), - WithFineGrainedChunksCaching(u.cfg.BucketStore.ChunksCache.FineGrainedChunksCachingEnabled), } bs, err := NewBucketStore( diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index b5671d1cb7..422018fafc 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -56,7 +56,6 @@ import ( "github.com/grafana/mimir/pkg/storage/sharding" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" @@ -1468,7 +1467,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries maxSeriesPerBatch: 10000, }, "with series streaming and caches (1K per batch)": { - options: []BucketStoreOption{WithLogger(logger), WithIndexCache(newInMemoryIndexCache(t)), WithChunksCache(newInMemoryChunksCache())}, + options: []BucketStoreOption{WithLogger(logger), WithIndexCache(newInMemoryIndexCache(t))}, maxSeriesPerBatch: 1000, }, } @@ -1772,7 +1771,6 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - chunksCache: chunkscache.NoopCache{}, indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, true, gate.NewNoop(), indexheader.NewReaderPoolMetrics(nil), indexheader.LazyLoadedHeadersSnapshotConfig{}), metrics: NewBucketStoreMetrics(nil), blockSet: &bucketBlockSet{blocks: []*bucketBlock{b1, b2}}, @@ -1954,7 +1952,6 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { NewBucketStoreMetrics(nil), WithLogger(logger), WithIndexCache(indexCache), - WithChunksCache(newInMemoryChunksCache()), ) assert.NoError(t, err) defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }() @@ -2207,7 +2204,6 @@ func testBucketStoreSeriesBlockWithMultipleChunks( NewBucketStoreMetrics(nil), WithLogger(logger), WithIndexCache(indexCache), - WithChunksCache(newInMemoryChunksCache()), ) assert.NoError(t, err) assert.NoError(t, store.SyncBlocks(context.Background())) @@ -2468,7 +2464,7 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.InMemoryIndexCacheConfig{}) assert.NoError(tb, err) - opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache), WithChunksCache(newInMemoryChunksCache())}, opts...) + opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache)}, opts...) store, err := NewBucketStore( "tenant", instrBkt, diff --git a/pkg/storegateway/chunkscache/cache.go b/pkg/storegateway/chunkscache/cache.go deleted file mode 100644 index 055d0b83c0..0000000000 --- a/pkg/storegateway/chunkscache/cache.go +++ /dev/null @@ -1,147 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package chunkscache - -import ( - "context" - "fmt" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/cache" - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/tsdb/chunks" - - "github.com/grafana/mimir/pkg/util/pool" - "github.com/grafana/mimir/pkg/util/spanlogger" -) - -// Range is a contiguous range of chunks. Start is the ref of the first chunk and NumChunks is how many chunks -// there are in the range. -type Range struct { - BlockID ulid.ULID - Start chunks.ChunkRef - NumChunks int -} - -type Cache interface { - FetchMultiChunks(ctx context.Context, userID string, ranges []Range, chunksPool *pool.SafeSlabPool[byte]) (hits map[Range][]byte) - StoreChunks(userID string, ranges map[Range][]byte) -} - -type TracingCache struct { - c Cache - l log.Logger -} - -func NewTracingCache(c Cache, l log.Logger) TracingCache { - return TracingCache{ - c: c, - l: l, - } -} - -func (c TracingCache) FetchMultiChunks(ctx context.Context, userID string, ranges []Range, chunksPool *pool.SafeSlabPool[byte]) (hits map[Range][]byte) { - hits = c.c.FetchMultiChunks(ctx, userID, ranges, chunksPool) - - l := spanlogger.FromContext(ctx, c.l) - level.Debug(l).Log( - "name", "ChunksCache.FetchMultiChunks", - "ranges_requested", len(ranges), - "ranges_hit", len(hits), - "ranges_hit_bytes", hitsSize(hits), - "ranges_misses", len(ranges)-len(hits), - ) - return -} - -func hitsSize(hits map[Range][]byte) (size int) { - for _, b := range hits { - size += len(b) - } - return -} - -func (c TracingCache) StoreChunks(userID string, ranges map[Range][]byte) { - c.c.StoreChunks(userID, ranges) -} - -type ChunksCache struct { - logger log.Logger - cache cache.Cache - - // TODO these two will soon be tracked by the dskit, we can remove them once https://github.com/grafana/mimir/pull/4078 is merged - requests prometheus.Counter - hits prometheus.Counter -} - -type NoopCache struct{} - -func (NoopCache) FetchMultiChunks(_ context.Context, _ string, _ []Range, _ *pool.SafeSlabPool[byte]) (hits map[Range][]byte) { - return nil -} - -func (NoopCache) StoreChunks(_ string, _ map[Range][]byte) { -} - -func NewChunksCache(logger log.Logger, client cache.Cache, reg prometheus.Registerer) (*ChunksCache, error) { - c := &ChunksCache{ - logger: logger, - cache: client, - } - - c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_bucket_store_chunks_cache_requests_total", - Help: "Total number of items requested from the cache.", - }) - - c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_bucket_store_chunks_cache_hits_total", - Help: "Total number of items retrieved from the cache.", - }) - - level.Info(logger).Log("msg", "created chunks cache") - - return c, nil -} - -func (c *ChunksCache) FetchMultiChunks(ctx context.Context, userID string, ranges []Range, chunksPool *pool.SafeSlabPool[byte]) (hits map[Range][]byte) { - keysMap := make(map[string]Range, len(ranges)) - keys := make([]string, len(ranges)) - for i, r := range ranges { - k := chunksKey(userID, r) - keysMap[k] = ranges[i] - keys[i] = k - } - - hitBytes := c.cache.Fetch(ctx, keys, cache.WithAllocator(pool.NewSafeSlabPoolAllocator(chunksPool))) - if len(hitBytes) > 0 { - hits = make(map[Range][]byte, len(hitBytes)) - for key, b := range hitBytes { - hits[keysMap[key]] = b - } - } - - c.requests.Add(float64(len(ranges))) - c.hits.Add(float64(len(hits))) - return -} - -func chunksKey(userID string, r Range) string { - return fmt.Sprintf("C:%s:%s:%d:%d", userID, r.BlockID, r.Start, r.NumChunks) -} - -const ( - defaultTTL = 7 * 24 * time.Hour -) - -func (c *ChunksCache) StoreChunks(userID string, ranges map[Range][]byte) { - rangesWithTenant := make(map[string][]byte, len(ranges)) - for r, v := range ranges { - rangesWithTenant[chunksKey(userID, r)] = v - } - c.cache.StoreAsync(rangesWithTenant, defaultTTL) -} diff --git a/pkg/storegateway/chunkscache/cache_test.go b/pkg/storegateway/chunkscache/cache_test.go deleted file mode 100644 index 0c92313b58..0000000000 --- a/pkg/storegateway/chunkscache/cache_test.go +++ /dev/null @@ -1,173 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package chunkscache - -import ( - "context" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/cache" - "github.com/oklog/ulid" - "github.com/pkg/errors" - prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/stretchr/testify/assert" -) - -func TestDskitChunksCache_FetchMultiChunks(t *testing.T) { - t.Parallel() - - // Init some data to conveniently define test cases later one. - user1 := "tenant1" - user2 := "tenant2" - block1 := ulid.MustNew(1, nil) - block2 := ulid.MustNew(2, nil) - range1 := Range{BlockID: block1, Start: chunks.ChunkRef(100), NumChunks: 10} - range2 := Range{BlockID: block1, Start: chunks.ChunkRef(200), NumChunks: 20} - range3 := Range{BlockID: block2, Start: chunks.ChunkRef(100), NumChunks: 10} - value1 := []byte{1} - value2 := []byte{2} - value3 := []byte{3} - - tests := map[string]struct { - setup []mockedChunks - mockedErr error - fetchUserID string - fetchRanges []Range - expectedHits map[Range][]byte - }{ - "should return no hits on empty cache": { - setup: []mockedChunks{}, - fetchUserID: user1, - fetchRanges: []Range{range1, range2}, - expectedHits: nil, - }, - "should return no misses on 100% hit ratio": { - setup: []mockedChunks{ - {userID: user1, r: range1, value: value1}, - {userID: user2, r: range2, value: value2}, - {userID: user1, r: range3, value: value3}, - }, - fetchUserID: user1, - fetchRanges: []Range{range1, range3}, - expectedHits: map[Range][]byte{ - range1: value1, - range3: value3, - }, - }, - "should return hits and misses on partial hits": { - setup: []mockedChunks{ - {userID: user1, r: range1, value: value1}, - {userID: user1, r: range2, value: value2}, - }, - fetchUserID: user1, - fetchRanges: []Range{range1, range3}, - expectedHits: map[Range][]byte{range1: value1}, - }, - "should return no hits on cache error": { - setup: []mockedChunks{ - {userID: user1, r: range1, value: value1}, - {userID: user1, r: range1, value: value2}, - {userID: user1, r: range1, value: value3}, - }, - mockedErr: errors.New("mocked error"), - fetchUserID: user1, - fetchRanges: []Range{range1, range2}, - expectedHits: nil, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - cacheClient := newMockedCacheClient(testData.mockedErr) - c, err := NewChunksCache(log.NewNopLogger(), cacheClient, nil) - assert.NoError(t, err) - - // Store the postings expected before running the test. - ctx := context.Background() - toStore := make(map[string]map[Range][]byte) - for _, p := range testData.setup { - if toStore[p.userID] == nil { - toStore[p.userID] = make(map[Range][]byte) - } - toStore[p.userID][p.r] = p.value - } - for userID, userRanges := range toStore { - c.StoreChunks(userID, userRanges) - } - - // Fetch postings from cached and assert on it. - hits := c.FetchMultiChunks(ctx, testData.fetchUserID, testData.fetchRanges, nil) - assert.Equal(t, testData.expectedHits, hits) - - // Assert on metrics. - assert.Equal(t, float64(len(testData.fetchRanges)), prom_testutil.ToFloat64(c.requests)) - assert.Equal(t, float64(len(testData.expectedHits)), prom_testutil.ToFloat64(c.hits)) - - }) - } -} - -func BenchmarkStringCacheKeys(b *testing.B) { - userID := "tenant" - rng := Range{BlockID: ulid.MustNew(1, nil), Start: chunks.ChunkRef(200), NumChunks: 20} - - b.Run("chunks", func(b *testing.B) { - for i := 0; i < b.N; i++ { - chunksKey(userID, rng) - } - }) - -} - -type mockedChunks struct { - userID string - r Range - value []byte -} - -type mockedCacheClient struct { - cache map[string][]byte - mockedGetMultiErr error -} - -func newMockedCacheClient(mockedGetMultiErr error) *mockedCacheClient { - return &mockedCacheClient{ - cache: map[string][]byte{}, - mockedGetMultiErr: mockedGetMultiErr, - } -} - -func (c *mockedCacheClient) Fetch(_ context.Context, keys []string, _ ...cache.Option) map[string][]byte { - if c.mockedGetMultiErr != nil { - return nil - } - - hits := map[string][]byte{} - - for _, key := range keys { - if value, ok := c.cache[key]; ok { - hits[key] = value - } - } - - return hits -} - -func (c *mockedCacheClient) StoreAsync(data map[string][]byte, _ time.Duration) { - for key, value := range data { - c.cache[key] = value - } -} - -func (c *mockedCacheClient) Delete(_ context.Context, key string) error { - delete(c.cache, key) - - return nil -} - -func (c *mockedCacheClient) Name() string { - return "mockedCacheClient" -} diff --git a/pkg/storegateway/series_chunks.go b/pkg/storegateway/series_chunks.go index 8ab9b17f53..cbf6093a29 100644 --- a/pkg/storegateway/series_chunks.go +++ b/pkg/storegateway/series_chunks.go @@ -4,8 +4,6 @@ package storegateway import ( "context" - "encoding/binary" - "fmt" "hash/crc32" "sync" "time" @@ -16,7 +14,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/storepb" util_math "github.com/grafana/mimir/pkg/util/math" "github.com/grafana/mimir/pkg/util/pool" @@ -187,7 +184,6 @@ func newChunksPreloadingIterator( ctx context.Context, logger log.Logger, userID string, - cache chunkscache.Cache, chunkReaders bucketChunkReaders, refsIterator seriesChunkRefsSetIterator, refsIteratorBatchSize int, @@ -195,7 +191,7 @@ func newChunksPreloadingIterator( minT, maxT int64, ) seriesChunksSetIterator { var iterator seriesChunksSetIterator - iterator = newLoadingSeriesChunksSetIterator(ctx, logger, userID, cache, chunkReaders, refsIterator, refsIteratorBatchSize, stats, minT, maxT) + iterator = newLoadingSeriesChunksSetIterator(ctx, logger, userID, chunkReaders, refsIterator, refsIteratorBatchSize, stats, minT, maxT) iterator = newPreloadingAndStatsTrackingSetIterator[seriesChunksSet](ctx, 1, iterator, stats) return iterator } @@ -335,7 +331,6 @@ type loadingSeriesChunksSetIterator struct { ctx context.Context logger log.Logger userID string - cache chunkscache.Cache chunkReaders bucketChunkReaders from seriesChunkRefsSetIterator fromBatchSize int @@ -350,7 +345,6 @@ func newLoadingSeriesChunksSetIterator( ctx context.Context, logger log.Logger, userID string, - cache chunkscache.Cache, chunkReaders bucketChunkReaders, from seriesChunkRefsSetIterator, fromBatchSize int, @@ -362,7 +356,6 @@ func newLoadingSeriesChunksSetIterator( ctx: ctx, logger: logger, userID: userID, - cache: cache, chunkReaders: chunkReaders, from: from, fromBatchSize: fromBatchSize, @@ -415,11 +408,6 @@ func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) { // so can safely expand it. nextSet.series = nextSet.series[:nextUnloaded.len()] - var cachedRanges map[chunkscache.Range][]byte - if c.cache != nil { - cachedRanges = c.cache.FetchMultiChunks(c.ctx, c.userID, toCacheKeys(nextUnloaded.series), chunksPool) - c.recordCachedChunks(cachedRanges) - } c.chunkReaders.reset() for sIdx, s := range nextUnloaded.series { nextSet.series[sIdx].lset = s.lset @@ -429,20 +417,10 @@ func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) { for _, chunksRange := range s.chunksRanges { rangeChunks := nextSet.series[sIdx].chks[seriesChunkIdx : seriesChunkIdx+len(chunksRange.refs)] initializeChunks(chunksRange, rangeChunks) - if cachedRange, ok := cachedRanges[toCacheKey(chunksRange)]; ok { - err := parseChunksRange(cachedRange, rangeChunks) - if err == nil { - seriesChunkIdx += len(chunksRange.refs) - continue - } - // we couldn't parse the chunk range form the cache, so we will fetch its chunks from the bucket. - level.Warn(c.logger).Log("msg", "parsing cache chunks", "err", err) - } for _, chunk := range chunksRange.refs { - if c.cache == nil && (chunk.minTime > c.maxTime || chunk.maxTime < c.minTime) { - // If the cache is not set, then we don't need to overfetch chunks that we know are outside minT/maxT. - // If the cache is set, then we need to do that, so we can cache the complete chunks ranges; they will be filtered out after fetching. + if chunk.minTime > c.maxTime || chunk.maxTime < c.minTime { + // We don't need to overfetch chunks that we know are outside minT/maxT. seriesChunkIdx++ continue } @@ -461,9 +439,6 @@ func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) { c.err = errors.Wrap(err, "loading chunks") return false } - if c.cache != nil { - c.storeRangesInCache(nextUnloaded.series, nextSet.series, cachedRanges) - } c.recordProcessedChunks(nextSet.series) // We might have over-fetched some chunks that were outside minT/maxT because we fetch a whole @@ -489,77 +464,6 @@ func initializeChunks(chunksRange seriesChunkRefsRange, chunks []storepb.AggrChu } } -func toCacheKeys(series []seriesChunkRefs) []chunkscache.Range { - totalRanges := 0 - for _, s := range series { - totalRanges += len(s.chunksRanges) - } - ranges := make([]chunkscache.Range, 0, totalRanges) - for _, s := range series { - for _, g := range s.chunksRanges { - ranges = append(ranges, toCacheKey(g)) - } - } - return ranges -} - -func toCacheKey(g seriesChunkRefsRange) chunkscache.Range { - return chunkscache.Range{ - BlockID: g.blockID, - Start: g.firstRef(), - NumChunks: len(g.refs), - } -} - -func parseChunksRange(rBytes []byte, chunks []storepb.AggrChunk) error { - for i := range chunks { - // ┌───────────────┬───────────────────┬──────────────┐ - // │ len │ encoding <1 byte> │ data │ - // └───────────────┴───────────────────┴──────────────┘ - chunkDataLen, n := varint.Uvarint(rBytes) - if n == 0 { - return fmt.Errorf("not enough bytes (%d) to read length of chunk %d/%d", len(rBytes), i, len(chunks)) - } - if n < 0 { - return fmt.Errorf("chunk length doesn't fit into uint64 %d/%d", i, len(chunks)) - } - totalChunkLen := n + 1 + int(chunkDataLen) - // The length was estimated, but at this point we know the exact length of the chunk, so we can set it. - if totalChunkLen > len(rBytes) { - return fmt.Errorf("malformed cached chunk range") - } - encodingByte := rBytes[n] - enc, ok := convertChunkEncoding(encodingByte) - if !ok { - return fmt.Errorf("unknown chunk encoding (%d)", encodingByte) - } - chunks[i].Raw.Type = enc - chunks[i].Raw.Data = rBytes[n+1 : totalChunkLen] - rBytes = rBytes[totalChunkLen:] - } - return nil -} - -func convertChunkEncoding(storageEncoding byte) (storepb.Chunk_Encoding, bool) { - converted := storepb.Chunk_Encoding(storageEncoding) - _, exists := storepb.Chunk_Encoding_name[int32(converted)] - return converted, exists -} - -func (c *loadingSeriesChunksSetIterator) recordCachedChunks(cachedRanges map[chunkscache.Range][]byte) { - fetchedChunks := 0 - fetchedBytes := 0 - for k, b := range cachedRanges { - fetchedChunks += k.NumChunks - fetchedBytes += len(b) - } - - c.stats.update(func(stats *queryStats) { - stats.chunksFetched += fetchedChunks - stats.chunksFetchedSizeSum += fetchedBytes - }) -} - func removeChunksOutsideRange(chks []storepb.AggrChunk, minT, maxT int64) []storepb.AggrChunk { writeIdx := 0 for i, chk := range chks { @@ -583,52 +487,6 @@ func (c *loadingSeriesChunksSetIterator) Err() error { return c.err } -func encodeChunksForCache(chunks []storepb.AggrChunk) []byte { - encodedSize := 0 - for _, chk := range chunks { - dataLen := len(chk.Raw.Data) - encodedSize += varint.UvarintSize(uint64(dataLen)) + 1 + dataLen - } - encoded := make([]byte, 0, encodedSize) - for _, chk := range chunks { - encoded = binary.AppendUvarint(encoded, uint64(len(chk.Raw.Data))) - // The cast to byte() below is safe because the actual type of the chunk in the TSDB is a single byte, - // so the type in our protos shouldn't take more than 1 byte. - encoded = append(encoded, byte(chk.Raw.Type)) - encoded = append(encoded, chk.Raw.Data...) - } - return encoded -} - -func (c *loadingSeriesChunksSetIterator) storeRangesInCache(seriesRefs []seriesChunkRefs, seriesChunks []seriesChunks, cacheHits map[chunkscache.Range][]byte) { - // Count the number of ranges that were not previously cached, and so we need to store to the cache. - cacheMisses := 0 - for _, s := range seriesRefs { - for _, chunksRange := range s.chunksRanges { - if _, ok := cacheHits[toCacheKey(chunksRange)]; !ok { - cacheMisses++ - } - } - } - - toStore := make(map[chunkscache.Range][]byte, cacheMisses) - for sIdx, s := range seriesRefs { - seriesChunkIdx := 0 - for _, chunksRange := range s.chunksRanges { - cacheKey := toCacheKey(chunksRange) - if _, ok := cacheHits[cacheKey]; ok { - seriesChunkIdx += len(chunksRange.refs) - continue - } - rangeChunks := seriesChunks[sIdx].chks[seriesChunkIdx : seriesChunkIdx+len(chunksRange.refs)] - toStore[cacheKey] = encodeChunksForCache(rangeChunks) - - seriesChunkIdx += len(chunksRange.refs) - } - } - c.cache.StoreChunks(c.userID, toStore) -} - func (c *loadingSeriesChunksSetIterator) recordReturnedChunks(series []seriesChunks) { returnedChunks, returnedChunksBytes := chunkStats(series) diff --git a/pkg/storegateway/series_chunks_test.go b/pkg/storegateway/series_chunks_test.go index 0bc2e308fc..0d22cd07af 100644 --- a/pkg/storegateway/series_chunks_test.go +++ b/pkg/storegateway/series_chunks_test.go @@ -20,7 +20,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" - "github.com/grafana/mimir/pkg/storegateway/chunkscache" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/test" @@ -541,25 +540,6 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) { series: generateSeriesEntries(t, 10), } - // sliceBlock slices the chunks and chunk refs for each series into totalParts and returns only the requested parts. - // fromPart is inclusive, toPart is not inclusive; fromPart and toPart are zero-indexed. - sliceBlock := func(b testBlock, fromPart, toPart, totalParts int) testBlock { - t.Helper() - require.Zero(t, len(b.series)%totalParts, "cannot divide block into uneven parts") - - series := make([]testBlockSeries, len(b.series)) - copy(series, block1.series) - chunksPerPart := len(b.series[0].refs) / totalParts - for sIdx, s := range series { - series[sIdx].refs = s.refs[chunksPerPart*fromPart : chunksPerPart*toPart] - series[sIdx].chks = s.chks[chunksPerPart*fromPart : chunksPerPart*toPart] - } - return testBlock{ - ulid: b.ulid, - series: series, - } - } - block2 := testBlock{ ulid: ulid.MustNew(2, nil), series: generateSeriesEntries(t, 20)[10:], // Make block2 contain different 10 series from those in block1 @@ -756,174 +736,10 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) { expectedErr: "test err", }, }, - "only cache misses": { - // Load series 0 and 2 - { - existingBlocks: []testBlock{block1, block2}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(0), block1.toSeriesChunkRefs(2)}}, - {series: []seriesChunkRefs{block2.toSeriesChunkRefs(0), block2.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - {series: []seriesChunks{block2.seriesChunks(0), block2.seriesChunks(2)}}, - }, - }, - // Next load a different set of series (1 and 3) - { - existingBlocks: []testBlock{block1, block2}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(1), block1.toSeriesChunkRefs(3)}}, - {series: []seriesChunkRefs{block2.toSeriesChunkRefs(1), block2.toSeriesChunkRefs(3)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(1), block1.seriesChunks(3)}}, - {series: []seriesChunks{block2.seriesChunks(1), block2.seriesChunks(3)}}, - }, - }, - }, - "skips cached chunks when there is a different number of chunks in the range": { - // Issue a request where the first series has its chunks only in two ranges - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 2), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - // Issue a request where the first series has its chunks in 12 ranges; it shouldn't interfere with the cache item from last request - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 12), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - }, - "cache hits": { - // Issue a request - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 1), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - // Issue the same request; this time with an empty storage - { - existingBlocks: []testBlock{}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(0), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - }, - "one block cache hit, one block cache misses": { - // First query only from block1 - { - existingBlocks: []testBlock{block1}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefs(0), block1.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - }, - }, - // Next query from block1 and block2 with only block2 available in the storage; chunks for block1 should be already cached - { - existingBlocks: []testBlock{block2}, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 1), block1.toSeriesChunkRefs(2)}}, - {series: []seriesChunkRefs{block2.toSeriesChunkRefsWithNRanges(0, 1), block2.toSeriesChunkRefs(2)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0), block1.seriesChunks(2)}}, - {series: []seriesChunks{block2.seriesChunks(0), block2.seriesChunks(2)}}, - }, - }, - }, - "some chunk range cache hit, some cache miss": { - // Load the chunks of series 0 loading only the chunks at the end of the block range. - // The chunks in the whole block cover time 0 to 500 for the first series. - { - existingBlocks: []testBlock{block1}, - minT: 301, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRangesOverlapping(0, 5, 301, 500)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.toSeriesChunksOverlapping(0, 301, 500)}}, - }, - }, - // Next query a wide time range, but make only the first half of chunks available in the store; - // If the cache is used, the loading iterator will read the second half of chunks from it. - { - existingBlocks: []testBlock{sliceBlock(block1, 0, 3, 5)}, - minT: 0, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 5)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0)}}, - }, - }, - }, - "after partial cache hits, cache is populated": { - // The chunks in the whole block cover time 0 to 500 for the first series. - // We make available a block which covers only the first two fifths of the block. - // Cache the first two fifths of chunks. - { - existingBlocks: []testBlock{sliceBlock(block1, 0, 2, 5)}, - minT: 0, - maxT: 199, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRangesOverlapping(0, 5, 0, 199)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.toSeriesChunksOverlapping(0, 0, 199)}}, - }, - }, - // Next query a wide time range; second fifth of chunks should come from the cache, the rest from the bucket. - { - existingBlocks: []testBlock{sliceBlock(block1, 2, 5, 5)}, - minT: 100, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRangesOverlapping(0, 5, 100, 500)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.toSeriesChunksOverlapping(0, 100, 500)}}, - }, - }, - // Query the same time range, this time all four fifths of chunks should be served form the cache. - { - existingBlocks: []testBlock{}, - minT: 0, - maxT: 500, - setsToLoad: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{block1.toSeriesChunkRefsWithNRanges(0, 5)}}, - }, - expectedSets: []seriesChunksSet{ - {series: []seriesChunks{block1.seriesChunks(0)}}, - }, - }, - }, } for testName, loadRequests := range testCases { t.Run(testName, func(t *testing.T) { - // Reuse the cache between requests, so we can test caching too - chunksCache := newInMemoryChunksCache() for scenarioIdx, testCase := range loadRequests { t.Run("step "+strconv.Itoa(scenarioIdx), func(t *testing.T) { @@ -944,7 +760,7 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) { } // Run test - set := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", chunksCache, *readers, newSliceSeriesChunkRefsSetIterator(nil, testCase.setsToLoad...), 100, newSafeQueryStats(), minT, maxT) + set := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", *readers, newSliceSeriesChunkRefsSetIterator(nil, testCase.setsToLoad...), 100, newSafeQueryStats(), minT, maxT) loadedSets := readAllSeriesChunksSets(set) // Assertions @@ -1025,7 +841,7 @@ func BenchmarkLoadingSeriesChunksSetIterator(b *testing.B) { for n := 0; n < b.N; n++ { batchSize := numSeriesPerSet - it := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", newInMemoryChunksCache(), *chunkReaders, newSliceSeriesChunkRefsSetIterator(nil, sets...), batchSize, stats, 0, 10000) + it := newLoadingSeriesChunksSetIterator(context.Background(), log.NewNopLogger(), "tenant", *chunkReaders, newSliceSeriesChunkRefsSetIterator(nil, sets...), batchSize, stats, 0, 10000) actualSeries := 0 actualChunks := 0 @@ -1057,28 +873,6 @@ func BenchmarkLoadingSeriesChunksSetIterator(b *testing.B) { } } -func BenchmarkEncodeChunksForCache(b *testing.B) { - testCases := map[string]struct { - toEncode []storepb.AggrChunk - }{ - "1 small chunk": {generateSeriesEntriesWithChunksSize(b, 1, 1, 32)[0].chks}, - "1 medium chunk": {generateSeriesEntriesWithChunksSize(b, 1, 1, 256)[0].chks}, - "1 big chunk": {generateSeriesEntriesWithChunksSize(b, 1, 1, 4096)[0].chks}, - "50 small chunks": {generateSeriesEntriesWithChunksSize(b, 1, 50, 32)[0].chks}, - "50 medium chunks": {generateSeriesEntriesWithChunksSize(b, 1, 50, 256)[0].chks}, - "50 big chunks": {generateSeriesEntriesWithChunksSize(b, 1, 50, 4096)[0].chks}, - } - - for testName, testCase := range testCases { - b.Run(testName, func(b *testing.B) { - for i := 0; i < b.N; i++ { - encoded := encodeChunksForCache(testCase.toEncode) - assert.NotEmpty(b, encoded) - } - }) - } -} - type chunkReaderMock struct { chunks map[chunks.ChunkRef]storepb.AggrChunk addLoadErr, loadErr error @@ -1294,33 +1088,3 @@ func readAllSeriesLabels(it storepb.SeriesSet) []labels.Labels { } return out } - -type inMemoryChunksCache struct { - cached map[string]map[chunkscache.Range][]byte -} - -func newInMemoryChunksCache() chunkscache.Cache { - return &inMemoryChunksCache{ - cached: map[string]map[chunkscache.Range][]byte{}, - } -} - -func (c *inMemoryChunksCache) FetchMultiChunks(_ context.Context, userID string, ranges []chunkscache.Range, _ *pool.SafeSlabPool[byte]) map[chunkscache.Range][]byte { - hits := make(map[chunkscache.Range][]byte, len(ranges)) - for _, r := range ranges { - if cached, ok := c.cached[userID][r]; ok { - hits[r] = cached - } - } - return hits -} - -func (c *inMemoryChunksCache) StoreChunks(userID string, ranges map[chunkscache.Range][]byte) { - if c.cached[userID] == nil { - c.cached[userID] = make(map[chunkscache.Range][]byte) - } - - for k, v := range ranges { - c.cached[userID][k] = v - } -} From 31cf9f961bead4b83b4eb9f031f68a89107c4a92 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Sat, 19 Aug 2023 14:15:22 +0300 Subject: [PATCH 2/9] Remove ranges per series configuration Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket.go | 3 -- pkg/storegateway/bucket_chunk_reader_test.go | 1 - pkg/storegateway/bucket_test.go | 20 ++----- pkg/storegateway/series_refs.go | 57 +++++++++----------- pkg/storegateway/series_refs_test.go | 54 ------------------- 5 files changed, 31 insertions(+), 104 deletions(-) diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index fc8a2a4a19..46f6dc99fb 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -1081,7 +1081,6 @@ func (s *BucketStore) getSeriesIteratorFromBlocks( cachedSeriesHasher{blockSeriesHashCache}, strategy, req.MinTime, req.MaxTime, - 1, stats, r, s.logger, @@ -1362,7 +1361,6 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers [] cachedSeriesHasher{nil}, noChunkRefs, minTime, maxTime, - 1, // we skip chunks, so this doesn't make any difference stats, nil, logger, @@ -1584,7 +1582,6 @@ func labelValuesFromSeries(ctx context.Context, labelName string, seriesPerBatch b.meta.MinTime, b.meta.MaxTime, b.userID, - 1, b.logger, ) if len(pendingMatchers) > 0 { diff --git a/pkg/storegateway/bucket_chunk_reader_test.go b/pkg/storegateway/bucket_chunk_reader_test.go index 186a136d19..25e9ebb6b1 100644 --- a/pkg/storegateway/bucket_chunk_reader_test.go +++ b/pkg/storegateway/bucket_chunk_reader_test.go @@ -41,7 +41,6 @@ func TestBucketChunkReader_refetchChunks(t *testing.T) { defaultStrategy, block.meta.MinTime, block.meta.MaxTime, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 422018fafc..7c3b041c2a 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1167,7 +1167,6 @@ func loadSeries(ctx context.Context, tb test.TB, postings []storage.SeriesRef, i 0, 0, "", - 1, log.NewNopLogger(), ) series := make([]labels.Labels, 0, len(postings)) @@ -1481,7 +1480,6 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: testData.maxSeriesPerBatch, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 1, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -1611,7 +1609,6 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: batchSize, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 1, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -1778,12 +1775,11 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1.meta.ULID: b1, b2.meta.ULID: b2, }, - postingsStrategy: selectAllStrategy{}, - queryGate: gate.NewNoop(), - chunksLimiterFactory: newStaticChunksLimiterFactory(0), - seriesLimiterFactory: newStaticSeriesLimiterFactory(0), - maxSeriesPerBatch: 65536, - numChunksRangesPerSeries: 1, + postingsStrategy: selectAllStrategy{}, + queryGate: gate.NewNoop(), + chunksLimiterFactory: newStaticChunksLimiterFactory(0), + seriesLimiterFactory: newStaticSeriesLimiterFactory(0), + maxSeriesPerBatch: 65536, } srv := newBucketStoreTestServer(t, store) @@ -1934,7 +1930,6 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -1993,7 +1988,6 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2060,7 +2054,6 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2186,7 +2179,6 @@ func testBucketStoreSeriesBlockWithMultipleChunks( tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: 5000, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2352,7 +2344,6 @@ func TestBucketStore_Series_Limits(t *testing.T) { tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: batchSize, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ @@ -2472,7 +2463,6 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS tmpDir, mimir_tsdb.BucketStoreConfig{ StreamingBatchSize: maxSeriesPerBatch, - ChunkRangesPerSeries: 1, BlockSyncConcurrency: 10, PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, IndexHeader: indexheader.Config{ diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index f1459a67ab..9226a6a288 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -711,19 +711,18 @@ func (l *limitingSeriesChunkRefsSetIterator) Err() error { } type loadingSeriesChunkRefsSetIterator struct { - ctx context.Context - postingsSetIterator *postingsSetsIterator - indexr *bucketIndexReader - indexCache indexcache.IndexCache - stats *safeQueryStats - blockID ulid.ULID - shard *sharding.ShardSelector - seriesHasher seriesHasher - strategy seriesIteratorStrategy - minTime, maxTime int64 - tenantID string - chunkRangesPerSeries int - logger log.Logger + ctx context.Context + postingsSetIterator *postingsSetsIterator + indexr *bucketIndexReader + indexCache indexcache.IndexCache + stats *safeQueryStats + blockID ulid.ULID + shard *sharding.ShardSelector + seriesHasher seriesHasher + strategy seriesIteratorStrategy + minTime, maxTime int64 + tenantID string + logger log.Logger chunkMetasBuffer []chunks.Meta @@ -743,7 +742,6 @@ func openBlockSeriesChunkRefsSetsIterator( seriesHasher seriesHasher, strategy seriesIteratorStrategy, minTime, maxTime int64, // Series must have data in this time range to be returned (ignored if skipChunks=true). - chunkRangesPerSeries int, stats *safeQueryStats, reuse *reusedPostingsAndMatchers, // If this is not nil, these posting and matchers are used as it is without fetching new ones. logger log.Logger, @@ -787,7 +785,6 @@ func openBlockSeriesChunkRefsSetsIterator( minTime, maxTime, tenantID, - chunkRangesPerSeries, logger, ) if len(pendingMatchers) > 0 { @@ -877,7 +874,6 @@ func newLoadingSeriesChunkRefsSetIterator( minTime int64, maxTime int64, tenantID string, - chunkRangesPerSeries int, logger log.Logger, ) *loadingSeriesChunkRefsSetIterator { if strategy.isNoChunkRefsOnEntireBlock() { @@ -885,20 +881,19 @@ func newLoadingSeriesChunkRefsSetIterator( } return &loadingSeriesChunkRefsSetIterator{ - ctx: ctx, - postingsSetIterator: postingsSetIterator, - indexr: indexr, - indexCache: indexCache, - stats: stats, - blockID: blockMeta.ULID, - shard: shard, - seriesHasher: seriesHasher, - strategy: strategy, - minTime: minTime, - maxTime: maxTime, - tenantID: tenantID, - logger: logger, - chunkRangesPerSeries: chunkRangesPerSeries, + ctx: ctx, + postingsSetIterator: postingsSetIterator, + indexr: indexr, + indexCache: indexCache, + stats: stats, + blockID: blockMeta.ULID, + shard: shard, + seriesHasher: seriesHasher, + strategy: strategy, + minTime: minTime, + maxTime: maxTime, + tenantID: tenantID, + logger: logger, } } @@ -1017,7 +1012,7 @@ func (s *loadingSeriesChunkRefsSetIterator) symbolizedSet(ctx context.Context, p } case !s.strategy.isNoChunkRefs(): clampLastChunkLength(symbolizedSet.series, metas) - series.chunksRanges = metasToRanges(partitionChunks(metas, s.chunkRangesPerSeries, minChunksPerRange), s.blockID, s.minTime, s.maxTime) + series.chunksRanges = metasToRanges([][]chunks.Meta{metas}, s.blockID, s.minTime, s.maxTime) } symbolizedSet.series = append(symbolizedSet.series, series) } diff --git a/pkg/storegateway/series_refs_test.go b/pkg/storegateway/series_refs_test.go index d5ed99f130..3d814db0f7 100644 --- a/pkg/storegateway/series_refs_test.go +++ b/pkg/storegateway/series_refs_test.go @@ -1461,7 +1461,6 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) { tc.minT, tc.maxT, "t1", - 1, log.NewNopLogger(), ) @@ -1702,54 +1701,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator(t *testing.T) { }}, }, }, - "partitions multiple chunks into 2 ranges": { - matcher: labels.MustNewMatcher(labels.MatchRegexp, "a", "3"), - batchSize: 1, - expectedSeries: []seriesChunkRefsSet{ - {series: []seriesChunkRefs{ - {lset: labels.FromStrings("a", "3", "b", "1"), chunksRanges: []seriesChunkRefsRange{ - {refs: []seriesChunkRef{ - {segFileOffset: 358, length: 49, minTime: 0, maxTime: 124}, - {segFileOffset: 407, length: 50, minTime: 125, maxTime: 249}, - {segFileOffset: 457, length: 50, minTime: 250, maxTime: 374}, - {segFileOffset: 507, length: 50, minTime: 375, maxTime: 499}, - {segFileOffset: 557, length: 50, minTime: 500, maxTime: 624}, - {segFileOffset: 607, length: 50, minTime: 625, maxTime: 749}, - {segFileOffset: 657, length: 50, minTime: 750, maxTime: 874}, - {segFileOffset: 707, length: 50, minTime: 875, maxTime: 999}, - {segFileOffset: 757, length: 50, minTime: 1000, maxTime: 1124}, - {segFileOffset: 807, length: 50, minTime: 1125, maxTime: 1249}, - }}, - {refs: []seriesChunkRef{ - {segFileOffset: 857, length: 50, minTime: 1250, maxTime: 1374}, - {segFileOffset: 907, length: 50, minTime: 1375, maxTime: 1499}, - {segFileOffset: 957, length: tsdb.EstimatedMaxChunkSize, minTime: 1500, maxTime: 1559}, - }}, - }}, - }}, - {series: []seriesChunkRefs{ - {lset: labels.FromStrings("a", "3", "b", "2"), chunksRanges: []seriesChunkRefsRange{ - {refs: []seriesChunkRef{ - {segFileOffset: 991, length: 49, minTime: 0, maxTime: 124}, - {segFileOffset: 1040, length: 50, minTime: 125, maxTime: 249}, - {segFileOffset: 1090, length: 50, minTime: 250, maxTime: 374}, - {segFileOffset: 1140, length: 50, minTime: 375, maxTime: 499}, - {segFileOffset: 1190, length: 50, minTime: 500, maxTime: 624}, - {segFileOffset: 1240, length: 50, minTime: 625, maxTime: 749}, - {segFileOffset: 1290, length: 50, minTime: 750, maxTime: 874}, - {segFileOffset: 1340, length: 50, minTime: 875, maxTime: 999}, - {segFileOffset: 1390, length: 50, minTime: 1000, maxTime: 1124}, - {segFileOffset: 1440, length: 50, minTime: 1125, maxTime: 1249}, - }}, - {refs: []seriesChunkRef{ - {segFileOffset: 1490, length: 50, minTime: 1250, maxTime: 1374}, - {segFileOffset: 1540, length: 50, minTime: 1375, maxTime: 1499}, - {segFileOffset: 1590, length: tsdb.EstimatedMaxChunkSize, minTime: 1500, maxTime: 1559}, - }}, - }}, - }}, - }, - }, "doesn't return a series if its chunks are around minT/maxT but not within it": { matcher: labels.MustNewMatcher(labels.MatchRegexp, "a", "4"), minT: 500, maxT: 600, // The chunks for this timeseries are between 0 and 99 and 1000 and 1099 @@ -1796,7 +1747,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator(t *testing.T) { strategy, minT, maxT, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), @@ -1898,7 +1848,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_pendingMatchers(t *testing.T) { noChunkRefs, // skip chunks since we are testing labels filtering block.meta.MinTime, block.meta.MaxTime, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), @@ -1963,7 +1912,6 @@ func BenchmarkOpenBlockSeriesChunkRefsSetsIterator(b *testing.B) { defaultStrategy, // we don't skip chunks, so we can measure impact in loading chunk refs too block.meta.MinTime, block.meta.MaxTime, - 2, newSafeQueryStats(), nil, log.NewNopLogger(), @@ -2512,7 +2460,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { noChunkRefs, b.meta.MinTime, b.meta.MaxTime, - 1, statsColdCache, nil, log.NewNopLogger(), @@ -2544,7 +2491,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) { noChunkRefs, b.meta.MinTime, b.meta.MaxTime, - 1, statsWarnCache, nil, log.NewNopLogger(), From 076b162ed2bd7968d57790a25039b211aca18138 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Sat, 19 Aug 2023 14:16:58 +0300 Subject: [PATCH 3/9] Update docs Signed-off-by: Dimitar Dimitrov --- CHANGELOG.md | 1 + docs/sources/mimir/configure/about-versioning.md | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78ff288249..c1c24c6d76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [CHANGE] The `-shutdown-delay` flag is no longer experimental. #5701 * [CHANGE] The `-validation.create-grace-period` is now enforced in the ingester too, other than distributor and query-frontend. If you've configured `-validation.create-grace-period` then make sure the configuration is applied to ingesters too. #5712 * [CHANGE] The `-validation.create-grace-period` is now enforced for examplars too in the distributor. If an examplar has timestamp greater than "now + grace_period", then the exemplar will be dropped and the metric `cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="..."}` increased. #5761 +* [CHANGE] Store-gateway: remove fine-grained chunks caching. The following configuration paramters have been removed `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled`, `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series`. * [FEATURE] Introduced `distributor.service_overload_status_code_on_rate_limit_enabled` flag for configuring status code to 529 instead of 429 upon rate limit exhaustion. #5752 * [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136 * [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524 diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index c487c6314b..ca0a4cf3f6 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -117,8 +117,6 @@ The following features are currently experimental: - Query-scheduler - `-query-scheduler.querier-forget-delay` - Store-gateway - - `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled` - - `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series` - Use of Redis cache backend (`-blocks-storage.bucket-store.chunks-cache.backend=redis`, `-blocks-storage.bucket-store.index-cache.backend=redis`, `-blocks-storage.bucket-store.metadata-cache.backend=redis`) - `-blocks-storage.bucket-store.series-selection-strategy` - Read-write deployment mode From 27fbf8f396057c33bd07ddb872bc57a372dbb5b1 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 22 Aug 2023 18:13:54 +0200 Subject: [PATCH 4/9] Remove unused constant Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/series_refs.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/storegateway/series_refs.go b/pkg/storegateway/series_refs.go index 9226a6a288..40b3a4fb25 100644 --- a/pkg/storegateway/series_refs.go +++ b/pkg/storegateway/series_refs.go @@ -1088,10 +1088,6 @@ func (s *loadingSeriesChunkRefsSetIterator) filterSeries(set seriesChunkRefsSet, return set } -const ( - minChunksPerRange = 10 -) - // partitionChunks creates a slice of []chunks.Meta for each range of chunks within the same segment file. // The partitioning here should be fairly static and not depend on the actual Series() request because // the resulting ranges may be used for caching, and we want our cache entries to be reusable between requests. From 79dde0f4568b12e36b89f80b286a4fb21558fc5e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 22 Aug 2023 18:23:20 +0200 Subject: [PATCH 5/9] fix linter Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_e2e_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 6db76700c2..1ed71d4e58 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -507,7 +507,7 @@ func TestBucketStore_e2e(t *testing.T) { return } - if ok := t.Run("with small index cache", func(t *testing.T) { + t.Run("with small index cache", func(t *testing.T) { indexCache2, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ MaxItemSize: 50, MaxSize: 100, @@ -515,9 +515,7 @@ func TestBucketStore_e2e(t *testing.T) { assert.NoError(t, err) s.cache.SwapIndexCacheWith(indexCache2) testBucketStore_e2e(t, ctx, s) - }); !ok { - return - } + }) }) } From f0dbac0f7d1ed7c1a92e0846cd74d87a47001b49 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 23 Aug 2023 10:27:59 +0200 Subject: [PATCH 6/9] fix linter Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_e2e_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 1ed71d4e58..5df452a1ea 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -570,7 +570,7 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { return } - if ok := t.Run("with small index cache", func(t *testing.T) { + t.Run("with small index cache", func(t *testing.T) { indexCache2, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{ MaxItemSize: 50, MaxSize: 100, @@ -578,9 +578,7 @@ func TestBucketStore_e2e_StreamingEdgeCases(t *testing.T) { assert.NoError(t, err) s.cache.SwapIndexCacheWith(indexCache2) testBucketStore_e2e(t, ctx, s) - }); !ok { - return - } + }) }) } From d7ed23ba353b662db18a39deff4b1f7410a74f35 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 23 Aug 2023 10:28:10 +0200 Subject: [PATCH 7/9] Update CHANGELOG.md Co-authored-by: Nick Pillitteri <56quarters@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1c24c6d76..823b474095 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ * [CHANGE] The `-shutdown-delay` flag is no longer experimental. #5701 * [CHANGE] The `-validation.create-grace-period` is now enforced in the ingester too, other than distributor and query-frontend. If you've configured `-validation.create-grace-period` then make sure the configuration is applied to ingesters too. #5712 * [CHANGE] The `-validation.create-grace-period` is now enforced for examplars too in the distributor. If an examplar has timestamp greater than "now + grace_period", then the exemplar will be dropped and the metric `cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="..."}` increased. #5761 -* [CHANGE] Store-gateway: remove fine-grained chunks caching. The following configuration paramters have been removed `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled`, `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series`. +* [CHANGE] Store-gateway: remove experimental fine-grained chunks caching. The following experimental configuration parameters have been removed `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled`, `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series`. * [FEATURE] Introduced `distributor.service_overload_status_code_on_rate_limit_enabled` flag for configuring status code to 529 instead of 429 upon rate limit exhaustion. #5752 * [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136 * [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524 From ec7e5b45f9563b29cb805cb53d15f7e753bf0c1f Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 25 Aug 2023 16:44:38 +0200 Subject: [PATCH 8/9] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 823b474095..3cd3c7d556 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ * [CHANGE] The `-shutdown-delay` flag is no longer experimental. #5701 * [CHANGE] The `-validation.create-grace-period` is now enforced in the ingester too, other than distributor and query-frontend. If you've configured `-validation.create-grace-period` then make sure the configuration is applied to ingesters too. #5712 * [CHANGE] The `-validation.create-grace-period` is now enforced for examplars too in the distributor. If an examplar has timestamp greater than "now + grace_period", then the exemplar will be dropped and the metric `cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="..."}` increased. #5761 -* [CHANGE] Store-gateway: remove experimental fine-grained chunks caching. The following experimental configuration parameters have been removed `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled`, `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series`. +* [CHANGE] Store-gateway: remove experimental fine-grained chunks caching. The following experimental configuration parameters have been removed `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled`, `-blocks-storage.bucket-store.fine-grained-chunks-caching-ranges-per-series`. #5816 * [FEATURE] Introduced `distributor.service_overload_status_code_on_rate_limit_enabled` flag for configuring status code to 529 instead of 429 upon rate limit exhaustion. #5752 * [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136 * [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524 From 2884de8681036f581ff8ccde31b16d5c3a1b0bf1 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 25 Aug 2023 16:52:19 +0200 Subject: [PATCH 9/9] Fix indentation Signed-off-by: Dimitar Dimitrov --- pkg/storegateway/bucket_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 0c0d4a7256..6d96e886b7 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1764,10 +1764,10 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } store := &BucketStore{ - userID: "test", - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: indexCache, + userID: "test", + bkt: objstore.WithNoopInstr(bkt), + logger: logger, + indexCache: indexCache, indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), indexheader.Config{ LazyLoadingEnabled: false, LazyLoadingIdleTimeout: 0,