Skip to content

Commit

Permalink
Store Gateway: Add experimental configuration to use MAP_POPULATE for…
Browse files Browse the repository at this point in the history
… indexheader reads. (#2019)

Introduces a new experimental configuration option (`-blocks-storage.bucket-store.index-header.map-populate-enabled`).

This enables the use of the `MAP_POPULATE` flag when `mmap`-ing index-header files in the store-gateway. What this flag does is advise the kernel to (synchronously) pre-fault all pages in the memory region, loading them into the file system cache.

Why is this a good idea?
- The initial read process of the index-header files has shown to cause hangups in the store-gateway.
- By using this option, I/O is done in the mmap() syscall, which the Go scheduler can cope with.
- We reduce the likelyhood of Goroutines getting stalled in major page faults.
- The initial read process walks the entire file anyway, so we are not doing any more I/O.
- It's a very low risk change compared to re-writing the BinaryReader (work in progress).

Why is this not perfect?
- The Kernel does not guarantee the pages will stay in memory, so we are only reducing the probability of major page faults.

Rationale about the implementation:
- I have copied the mmap utilities from Prometheus as a temporary measure, for the sake of evaluating this change.
  • Loading branch information
stevesg authored Jun 7, 2022
1 parent 84e4901 commit 6b5cc6a
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
* [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
* [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028
* [ENHANCEMENT] Store-gateway: Add experimental configuration option for the store-gateway to attempt to pre-populate the file system cache when memory-mapping index-header files. Enabled with `-blocks-storage.bucket-store.index-header.map-populate-enabled=true`. #2019
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
* [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
* [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
Expand Down
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5131,6 +5131,27 @@
"fieldFlag": "blocks-storage.bucket-store.posting-offsets-in-mem-sampling",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "block",
"name": "index_header",
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "map_populate_enabled",
"required": false,
"desc": "If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "blocks-storage.bucket-store.index-header.map-populate-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
"fieldDefaultValue": null
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ Usage of ./cmd/mimir/mimir:
If enabled, store-gateway will lazy load an index-header only once required by a query. (default true)
-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout duration
If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s)
-blocks-storage.bucket-store.index-header.map-populate-enabled
[experimental] If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.
-blocks-storage.bucket-store.max-chunk-pool-bytes uint
Max size - in bytes - of a chunks pool, used to reduce memory allocations. The pool is shared across all tenants. 0 to disable the limit. (default 2147483648)
-blocks-storage.bucket-store.max-concurrent int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3397,6 +3397,12 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.posting-offsets-in-mem-sampling
[postings_offsets_in_mem_sampling: <int> | default = 32]
index_header:
# (experimental) If enabled, the store-gateway will attempt to pre-populate
# the file system cache when memory-mapping index-header files.
# CLI flag: -blocks-storage.bucket-store.index-header.map-populate-enabled
[map_populate_enabled: <boolean> | default = false]
tsdb:
# Directory to store TSDBs (including WAL) in the ingesters. This directory is
# required to be persisted between restarts.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)

Expand Down Expand Up @@ -207,7 +208,6 @@ require (
go.uber.org/zap v1.19.1 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wal"

"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storegateway/indexheader"
)

const (
Expand Down Expand Up @@ -292,6 +293,9 @@ type BucketStoreConfig struct {
// On the contrary, smaller value will increase baseline memory usage, but improve latency slightly.
// 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.
PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"`

// Controls experimental options for index-header file reading.
IndexHeader indexheader.BinaryReaderConfig `yaml:"index_header" category:"experimental"`
}

// RegisterFlags registers the BucketStore flags
Expand All @@ -300,6 +304,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.")
cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.")
cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.")
cfg.IndexHeader.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-header.")

f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "./tsdb-sync/", "Directory to store synchronized TSDB index headers. This directory is not required to be persisted between restarts, but it's highly recommended in order to improve the store-gateway startup time.")
f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).")
Expand Down
6 changes: 6 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type BucketStore struct {
// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

// Additional configuration for experimental indexheader.BinaryReader behaviour.
indexHeaderCfg indexheader.BinaryReaderConfig

// Enables hints in the Series() response.
enableSeriesResponseHints bool
}
Expand Down Expand Up @@ -218,6 +221,7 @@ func NewBucketStore(
partitioner Partitioner,
blockSyncConcurrency int,
postingOffsetsInMemSampling int,
indexHeaderCfg indexheader.BinaryReaderConfig,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
lazyIndexReaderEnabled bool,
lazyIndexReaderIdleTimeout time.Duration,
Expand All @@ -240,6 +244,7 @@ func NewBucketStore(
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
indexHeaderCfg: indexHeaderCfg,
enableSeriesResponseHints: enableSeriesResponseHints,
seriesHashCache: seriesHashCache,
metrics: metrics,
Expand Down Expand Up @@ -403,6 +408,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
s.dir,
meta.ULID,
s.postingOffsetsInMemSampling,
s.indexHeaderCfg,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
Expand Down
2 changes: 2 additions & 0 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/indexheader"
"github.com/grafana/mimir/pkg/storegateway/testhelper"

"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -171,6 +172,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
20,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
true,
time.Minute,
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
u.cfg.BucketStore.PostingOffsetsInMemSampling,
u.cfg.BucketStore.IndexHeader,
true, // Enable series hints.
u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled,
u.cfg.BucketStore.IndexHeaderLazyLoadingIdleTimeout,
Expand Down
12 changes: 8 additions & 4 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ func prepareTestBlock(tb test.TB, series int) func() *bucketBlock {
})

id := uploadTestBlock(tb, tmpDir, bkt, series)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
require.NoError(tb, err)

return func() *bucketBlock {
Expand Down Expand Up @@ -1359,6 +1359,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
false,
false,
0,
Expand Down Expand Up @@ -1520,7 +1521,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
assert.NoError(t, err)
}

Expand Down Expand Up @@ -1559,7 +1560,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
assert.NoError(t, err)
}

Expand Down Expand Up @@ -1723,6 +1724,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
false,
0,
Expand Down Expand Up @@ -1813,6 +1815,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
false,
0,
Expand Down Expand Up @@ -1996,6 +1999,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
false,
0,
Expand Down Expand Up @@ -2296,7 +2300,7 @@ func prepareBucket(b *testing.B, resolutionLevel compactor.ResolutionLevel) (*bu
partitioner := newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil)

// Create an index header reader.
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
assert.NoError(b, err)
indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.DefaultInMemoryIndexCacheConfig)
assert.NoError(b, err)
Expand Down
21 changes: 16 additions & 5 deletions pkg/storegateway/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bufio"
"context"
"encoding/binary"
"flag"
"hash"
"hash/crc32"
"io"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"

mmap "github.com/grafana/mimir/pkg/storegateway/indexheader/fileutil"
)

const (
Expand Down Expand Up @@ -459,10 +462,18 @@ type BinaryReader struct {
postingOffsetsInMemSampling int
}

type BinaryReaderConfig struct {
MapPopulateEnabled bool `yaml:"map_populate_enabled" category:"experimental"`
}

func (cfg *BinaryReaderConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.MapPopulateEnabled, prefix+"map-populate-enabled", false, "If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.")
}

// NewBinaryReader loads or builds new index-header if not present on disk.
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) {
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (*BinaryReader, error) {
binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling, cfg)
if err == nil {
return br, nil
}
Expand All @@ -475,11 +486,11 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
}

level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))
return newFileBinaryReader(binfn, postingOffsetsInMemSampling)
return newFileBinaryReader(binfn, postingOffsetsInMemSampling, cfg)
}

func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) {
f, err := fileutil.OpenMmapFile(path)
func newFileBinaryReader(path string, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (bw *BinaryReader, err error) {
f, err := mmap.OpenMmapFile(path, cfg.MapPopulateEnabled)
if err != nil {
return nil, err
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/storegateway/indexheader/fileutil/mmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/tsdb/fileutil/mmap.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors.

package fileutil

import (
"os"

"github.com/pkg/errors"
)

type MmapFile struct {
f *os.File
b []byte
}

func OpenMmapFile(path string, populate bool) (*MmapFile, error) {
return OpenMmapFileWithSize(path, 0, populate)
}

func OpenMmapFileWithSize(path string, size int, populate bool) (mf *MmapFile, retErr error) {
f, err := os.Open(path)
if err != nil {
return nil, errors.Wrap(err, "try lock file")
}
defer func() {
if retErr != nil {
f.Close()
}
}()
if size <= 0 {
info, err := f.Stat()
if err != nil {
return nil, errors.Wrap(err, "stat")
}
size = int(info.Size())
}

b, err := mmap(f, size, populate)
if err != nil {
return nil, errors.Wrapf(err, "mmap, size %d", size)
}

return &MmapFile{f: f, b: b}, nil
}

func (f *MmapFile) Close() error {
err0 := munmap(f.b)
err1 := f.f.Close()

if err0 != nil {
return err0
}
return err1
}

func (f *MmapFile) File() *os.File {
return f.f
}

func (f *MmapFile) Bytes() []byte {
return f.b
}
27 changes: 27 additions & 0 deletions pkg/storegateway/indexheader/fileutil/mmap_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/tsdb/fileutil/mmap_unix.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors.

//go:build !windows && !plan9
// +build !windows,!plan9

package fileutil

import (
"os"

"golang.org/x/sys/unix"
)

func mmap(f *os.File, length int, populate bool) ([]byte, error) {
flags := unix.MAP_SHARED
if populate {
flags |= unix.MAP_POPULATE
}
return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, flags)
}

func munmap(b []byte) (err error) {
return unix.Munmap(b)
}
Loading

0 comments on commit 6b5cc6a

Please sign in to comment.