From 6b5cc6a8449dedc3fd9c7f4453e7703f631bdc8a Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Tue, 7 Jun 2022 19:43:01 +0200 Subject: [PATCH] Store Gateway: Add experimental configuration to use MAP_POPULATE for indexheader reads. (#2019) Introduces a new experimental configuration option (`-blocks-storage.bucket-store.index-header.map-populate-enabled`). This enables the use of the `MAP_POPULATE` flag when `mmap`-ing index-header files in the store-gateway. What this flag does is advise the kernel to (synchronously) pre-fault all pages in the memory region, loading them into the file system cache. Why is this a good idea? - The initial read process of the index-header files has shown to cause hangups in the store-gateway. - By using this option, I/O is done in the mmap() syscall, which the Go scheduler can cope with. - We reduce the likelyhood of Goroutines getting stalled in major page faults. - The initial read process walks the entire file anyway, so we are not doing any more I/O. - It's a very low risk change compared to re-writing the BinaryReader (work in progress). Why is this not perfect? - The Kernel does not guarantee the pages will stay in memory, so we are only reducing the probability of major page faults. Rationale about the implementation: - I have copied the mmap utilities from Prometheus as a temporary measure, for the sake of evaluating this change. --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 21 ++++++ cmd/mimir/help-all.txt.tmpl | 2 + .../index.md | 6 ++ go.mod | 2 +- pkg/storage/tsdb/config.go | 5 ++ pkg/storegateway/bucket.go | 6 ++ pkg/storegateway/bucket_e2e_test.go | 2 + pkg/storegateway/bucket_stores.go | 1 + pkg/storegateway/bucket_test.go | 12 ++-- pkg/storegateway/indexheader/binary_reader.go | 21 ++++-- pkg/storegateway/indexheader/fileutil/mmap.go | 65 +++++++++++++++++++ .../indexheader/fileutil/mmap_unix.go | 27 ++++++++ pkg/storegateway/indexheader/header_test.go | 18 +++-- .../indexheader/lazy_binary_reader.go | 5 +- .../indexheader/lazy_binary_reader_test.go | 12 ++-- pkg/storegateway/indexheader/reader_pool.go | 6 +- .../indexheader/reader_pool_test.go | 4 +- 18 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 pkg/storegateway/indexheader/fileutil/mmap.go create mode 100644 pkg/storegateway/indexheader/fileutil/mmap_unix.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b957ddd9f3b..79a51159dfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912 * [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951 * [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028 +* [ENHANCEMENT] Store-gateway: Add experimental configuration option for the store-gateway to attempt to pre-populate the file system cache when memory-mapping index-header files. Enabled with `-blocks-storage.bucket-store.index-header.map-populate-enabled=true`. #2019 * [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883 * [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893 * [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 396ed62d969..19378ab02e8 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -5131,6 +5131,27 @@ "fieldFlag": "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", "fieldType": "int", "fieldCategory": "advanced" + }, + { + "kind": "block", + "name": "index_header", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "map_populate_enabled", + "required": false, + "desc": "If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "blocks-storage.bucket-store.index-header.map-populate-enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 416d8c377e2..b0a3ad14190 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -334,6 +334,8 @@ Usage of ./cmd/mimir/mimir: If enabled, store-gateway will lazy load an index-header only once required by a query. (default true) -blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout duration If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s) + -blocks-storage.bucket-store.index-header.map-populate-enabled + [experimental] If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files. -blocks-storage.bucket-store.max-chunk-pool-bytes uint Max size - in bytes - of a chunks pool, used to reduce memory allocations. The pool is shared across all tenants. 0 to disable the limit. (default 2147483648) -blocks-storage.bucket-store.max-concurrent int diff --git a/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md b/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md index 665e89727cd..1d5a897ad74 100644 --- a/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md +++ b/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md @@ -3397,6 +3397,12 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.posting-offsets-in-mem-sampling [postings_offsets_in_mem_sampling: | default = 32] + index_header: + # (experimental) If enabled, the store-gateway will attempt to pre-populate + # the file system cache when memory-mapping index-header files. + # CLI flag: -blocks-storage.bucket-store.index-header.map-populate-enabled + [map_populate_enabled: | default = false] + tsdb: # Directory to store TSDBs (including WAL) in the ingesters. This directory is # required to be persisted between restarts. diff --git a/go.mod b/go.mod index 74fc1b6bdd8..889656d20f8 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,7 @@ require ( github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db + golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) @@ -207,7 +208,6 @@ require ( go.uber.org/zap v1.19.1 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect - golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 4fbdb5da8e5..8445f737bc7 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wal" "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storegateway/indexheader" ) const ( @@ -292,6 +293,9 @@ type BucketStoreConfig struct { // On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. // 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance. PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"` + + // Controls experimental options for index-header file reading. + IndexHeader indexheader.BinaryReaderConfig `yaml:"index_header" category:"experimental"` } // RegisterFlags registers the BucketStore flags @@ -300,6 +304,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.") cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.") cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.") + cfg.IndexHeader.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-header.") f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "./tsdb-sync/", "Directory to store synchronized TSDB index headers. This directory is not required to be persisted between restarts, but it's highly recommended in order to improve the store-gateway startup time.") f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).") diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 06046359778..51eafc964dd 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -127,6 +127,9 @@ type BucketStore struct { // Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32. postingOffsetsInMemSampling int + // Additional configuration for experimental indexheader.BinaryReader behaviour. + indexHeaderCfg indexheader.BinaryReaderConfig + // Enables hints in the Series() response. enableSeriesResponseHints bool } @@ -218,6 +221,7 @@ func NewBucketStore( partitioner Partitioner, blockSyncConcurrency int, postingOffsetsInMemSampling int, + indexHeaderCfg indexheader.BinaryReaderConfig, enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. lazyIndexReaderEnabled bool, lazyIndexReaderIdleTimeout time.Duration, @@ -240,6 +244,7 @@ func NewBucketStore( seriesLimiterFactory: seriesLimiterFactory, partitioner: partitioner, postingOffsetsInMemSampling: postingOffsetsInMemSampling, + indexHeaderCfg: indexHeaderCfg, enableSeriesResponseHints: enableSeriesResponseHints, seriesHashCache: seriesHashCache, metrics: metrics, @@ -403,6 +408,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.dir, meta.ULID, s.postingOffsetsInMemSampling, + s.indexHeaderCfg, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index ce9f9f142f8..cb38e1fddb1 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -27,6 +27,7 @@ import ( mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storegateway/indexcache" + "github.com/grafana/mimir/pkg/storegateway/indexheader" "github.com/grafana/mimir/pkg/storegateway/testhelper" "github.com/thanos-io/thanos/pkg/block" @@ -171,6 +172,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore. newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), 20, mimir_tsdb.DefaultPostingOffsetInMemorySampling, + indexheader.BinaryReaderConfig{}, true, true, time.Minute, diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 0bbc3d2920b..ecdc05e8f94 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -489,6 +489,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) { u.partitioner, u.cfg.BucketStore.BlockSyncConcurrency, u.cfg.BucketStore.PostingOffsetsInMemSampling, + u.cfg.BucketStore.IndexHeader, true, // Enable series hints. u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled, u.cfg.BucketStore.IndexHeaderLazyLoadingIdleTimeout, diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index e72621ad684..11fc2247950 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1096,7 +1096,7 @@ func prepareTestBlock(tb test.TB, series int) func() *bucketBlock { }) id := uploadTestBlock(tb, tmpDir, bkt, series) - r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, mimir_tsdb.DefaultPostingOffsetInMemorySampling) + r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{}) require.NoError(tb, err) return func() *bucketBlock { @@ -1359,6 +1359,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), 1, mimir_tsdb.DefaultPostingOffsetInMemorySampling, + indexheader.BinaryReaderConfig{}, false, false, 0, @@ -1520,7 +1521,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, } - b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling) + b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{}) assert.NoError(t, err) } @@ -1559,7 +1560,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, } - b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling) + b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{}) assert.NoError(t, err) } @@ -1723,6 +1724,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), 10, mimir_tsdb.DefaultPostingOffsetInMemorySampling, + indexheader.BinaryReaderConfig{}, true, false, 0, @@ -1813,6 +1815,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), 10, mimir_tsdb.DefaultPostingOffsetInMemorySampling, + indexheader.BinaryReaderConfig{}, true, false, 0, @@ -1996,6 +1999,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), 10, mimir_tsdb.DefaultPostingOffsetInMemorySampling, + indexheader.BinaryReaderConfig{}, true, false, 0, @@ -2296,7 +2300,7 @@ func prepareBucket(b *testing.B, resolutionLevel compactor.ResolutionLevel) (*bu partitioner := newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil) // Create an index header reader. - indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling) + indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{}) assert.NoError(b, err) indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.DefaultInMemoryIndexCacheConfig) assert.NoError(b, err) diff --git a/pkg/storegateway/indexheader/binary_reader.go b/pkg/storegateway/indexheader/binary_reader.go index 73d127f1e40..75168465e1a 100644 --- a/pkg/storegateway/indexheader/binary_reader.go +++ b/pkg/storegateway/indexheader/binary_reader.go @@ -9,6 +9,7 @@ import ( "bufio" "context" "encoding/binary" + "flag" "hash" "hash/crc32" "io" @@ -32,6 +33,8 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" + + mmap "github.com/grafana/mimir/pkg/storegateway/indexheader/fileutil" ) const ( @@ -459,10 +462,18 @@ type BinaryReader struct { postingOffsetsInMemSampling int } +type BinaryReaderConfig struct { + MapPopulateEnabled bool `yaml:"map_populate_enabled" category:"experimental"` +} + +func (cfg *BinaryReaderConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.BoolVar(&cfg.MapPopulateEnabled, prefix+"map-populate-enabled", false, "If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.") +} + // NewBinaryReader loads or builds new index-header if not present on disk. -func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) { +func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (*BinaryReader, error) { binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename) - br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling) + br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling, cfg) if err == nil { return br, nil } @@ -475,11 +486,11 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket } level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start)) - return newFileBinaryReader(binfn, postingOffsetsInMemSampling) + return newFileBinaryReader(binfn, postingOffsetsInMemSampling, cfg) } -func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) { - f, err := fileutil.OpenMmapFile(path) +func newFileBinaryReader(path string, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (bw *BinaryReader, err error) { + f, err := mmap.OpenMmapFile(path, cfg.MapPopulateEnabled) if err != nil { return nil, err } diff --git a/pkg/storegateway/indexheader/fileutil/mmap.go b/pkg/storegateway/indexheader/fileutil/mmap.go new file mode 100644 index 00000000000..494dbf1f24f --- /dev/null +++ b/pkg/storegateway/indexheader/fileutil/mmap.go @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/tsdb/fileutil/mmap.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors. + +package fileutil + +import ( + "os" + + "github.com/pkg/errors" +) + +type MmapFile struct { + f *os.File + b []byte +} + +func OpenMmapFile(path string, populate bool) (*MmapFile, error) { + return OpenMmapFileWithSize(path, 0, populate) +} + +func OpenMmapFileWithSize(path string, size int, populate bool) (mf *MmapFile, retErr error) { + f, err := os.Open(path) + if err != nil { + return nil, errors.Wrap(err, "try lock file") + } + defer func() { + if retErr != nil { + f.Close() + } + }() + if size <= 0 { + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "stat") + } + size = int(info.Size()) + } + + b, err := mmap(f, size, populate) + if err != nil { + return nil, errors.Wrapf(err, "mmap, size %d", size) + } + + return &MmapFile{f: f, b: b}, nil +} + +func (f *MmapFile) Close() error { + err0 := munmap(f.b) + err1 := f.f.Close() + + if err0 != nil { + return err0 + } + return err1 +} + +func (f *MmapFile) File() *os.File { + return f.f +} + +func (f *MmapFile) Bytes() []byte { + return f.b +} diff --git a/pkg/storegateway/indexheader/fileutil/mmap_unix.go b/pkg/storegateway/indexheader/fileutil/mmap_unix.go new file mode 100644 index 00000000000..74894e9da75 --- /dev/null +++ b/pkg/storegateway/indexheader/fileutil/mmap_unix.go @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/tsdb/fileutil/mmap_unix.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors. + +//go:build !windows && !plan9 +// +build !windows,!plan9 + +package fileutil + +import ( + "os" + + "golang.org/x/sys/unix" +) + +func mmap(f *os.File, length int, populate bool) ([]byte, error) { + flags := unix.MAP_SHARED + if populate { + flags |= unix.MAP_POPULATE + } + return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, flags) +} + +func munmap(b []byte) (err error) { + return unix.Munmap(b) +} diff --git a/pkg/storegateway/indexheader/header_test.go b/pkg/storegateway/indexheader/header_test.go index 48f8174d407..a600691d78a 100644 --- a/pkg/storegateway/indexheader/header_test.go +++ b/pkg/storegateway/indexheader/header_test.go @@ -103,11 +103,11 @@ func TestReaders(t *testing.T) { b := realByteSlice(indexFile.Bytes()) - t.Run("binary reader", func(t *testing.T) { + testBinaryReader := func(t *testing.T, cfg BinaryReaderConfig) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) require.NoError(t, WriteBinary(ctx, bkt, id, fn)) - br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3) + br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, cfg) require.NoError(t, err) defer func() { require.NoError(t, br.Close()) }() @@ -174,13 +174,21 @@ func TestReaders(t *testing.T) { } compareIndexToHeader(t, b, br) + } + + t.Run("binary reader", func(t *testing.T) { + testBinaryReader(t, BinaryReaderConfig{}) + }) + + t.Run("binary reader with map populate", func(t *testing.T) { + testBinaryReader(t, BinaryReaderConfig{MapPopulateEnabled: true}) }) t.Run("lazy binary reader", func(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) require.NoError(t, WriteBinary(ctx, bkt, id, fn)) - br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil) + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, BinaryReaderConfig{}, NewLazyBinaryReaderMetrics(nil), nil) require.NoError(t, err) defer func() { require.NoError(t, br.Close()) }() @@ -369,7 +377,7 @@ func BenchmarkBinaryReader(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - br, err := newFileBinaryReader(fn, 32) + br, err := newFileBinaryReader(fn, 32, BinaryReaderConfig{}) require.NoError(t, err) require.NoError(t, br.Close()) } @@ -409,7 +417,7 @@ func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) { require.NoError(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc)) // Create an index reader. - reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling) + reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling, BinaryReaderConfig{}) require.NoError(b, err) // Get the offset of each label value symbol. diff --git a/pkg/storegateway/indexheader/lazy_binary_reader.go b/pkg/storegateway/indexheader/lazy_binary_reader.go index 18c64725cb1..9402d9fd44a 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader.go @@ -76,6 +76,7 @@ type LazyBinaryReader struct { filepath string id ulid.ULID postingOffsetsInMemSampling int + cfg BinaryReaderConfig metrics *LazyBinaryReaderMetrics onClosed func(*LazyBinaryReader) @@ -98,6 +99,7 @@ func NewLazyBinaryReader( dir string, id ulid.ULID, postingOffsetsInMemSampling int, + cfg BinaryReaderConfig, metrics *LazyBinaryReaderMetrics, onClosed func(*LazyBinaryReader), ) (*LazyBinaryReader, error) { @@ -127,6 +129,7 @@ func NewLazyBinaryReader( filepath: filepath, id: id, postingOffsetsInMemSampling: postingOffsetsInMemSampling, + cfg: cfg, metrics: metrics, usedAt: atomic.NewInt64(time.Now().UnixNano()), onClosed: onClosed, @@ -247,7 +250,7 @@ func (r *LazyBinaryReader) load() (returnErr error) { r.metrics.loadCount.Inc() startTime := time.Now() - reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling) + reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling, r.cfg) if err != nil { r.metrics.loadFailedCount.Inc() r.readerErr = err diff --git a/pkg/storegateway/indexheader/lazy_binary_reader_test.go b/pkg/storegateway/indexheader/lazy_binary_reader_test.go index 8e501499258..1503dee2d79 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader_test.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader_test.go @@ -38,7 +38,7 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) require.NoError(t, err) defer func() { require.NoError(t, bkt.Close()) }() - _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), nil) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, BinaryReaderConfig{}, NewLazyBinaryReaderMetrics(nil), nil) require.Error(t, err) } @@ -62,7 +62,7 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}, m, nil) require.NoError(t, err) require.True(t, r.reader == nil) require.Equal(t, float64(0), promtestutil.ToFloat64(m.loadCount)) @@ -107,7 +107,7 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { require.NoError(t, ioutil.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}, m, nil) require.NoError(t, err) require.True(t, r.reader == nil) require.Equal(t, float64(0), promtestutil.ToFloat64(m.loadCount)) @@ -143,7 +143,7 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}, m, nil) require.NoError(t, err) require.True(t, r.reader == nil) @@ -195,7 +195,7 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}, m, nil) require.NoError(t, err) require.True(t, r.reader == nil) @@ -246,7 +246,7 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}, m, nil) require.NoError(t, err) require.True(t, r.reader == nil) t.Cleanup(func() { diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index 15ef9080255..6f5dbb89084 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -82,14 +82,14 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (Reader, error) { var reader Reader var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.onLazyReaderClosed) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, cfg, p.metrics.lazyReader, p.onLazyReaderClosed) } else { - reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) + reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, cfg) } if err != nil { diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storegateway/indexheader/reader_pool_test.go index e77d0113b81..19160bfe04b 100644 --- a/pkg/storegateway/indexheader/reader_pool_test.go +++ b/pkg/storegateway/indexheader/reader_pool_test.go @@ -66,7 +66,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil)) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() @@ -103,7 +103,7 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, BinaryReaderConfig{}) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }()