diff --git a/CHANGELOG.md b/CHANGELOG.md index dd6cd80fcd..db7aab31dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data. - [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction. - [#3245](https://github.com/thanos-io/thanos/pull/3245) Query Frontend: Add `query-frontend.org-id-header` flag to specify HTTP header(s) to populate slow query log (e.g. X-Grafana-User). +- [#3431](https://github.com/thanos-io/thanos/pull/3431) Store: Added experimental support to lazy load index-headers at query time. When enabled via `--store.enable-index-header-lazy-reader` flag, the store-gateway will load into memory an index-header only once it's required at query time. Index-header will be automatically released after `--store.index-header-lazy-reader-idle-timeout` of inactivity. + * This, generally, reduces baseline memory usage of store when inactive, as well as a total number of mapped files (which is limited to 64k in some systems. ### Fixed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 17c1339182..352d660414 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -107,6 +107,12 @@ func registerStore(app *extkingpin.App) { "Default is 24h, half of the default value for --delete-delay on compactor."). Default("24h")) + lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query."). + Default("false").Bool() + + lazyIndexReaderIdleTimeout := cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). + Hidden().Default("5m").Duration() + webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String() @@ -152,6 +158,8 @@ func registerStore(app *extkingpin.App) { *postingOffsetsInMemSampling, cachingBucketConfig, getFlagsMap(cmd.Flags()), + *lazyIndexReaderEnabled, + *lazyIndexReaderIdleTimeout, ) }) } @@ -184,6 +192,8 @@ func runStore( postingOffsetsInMemSampling int, cachingBucketConfig *extflag.PathOrContent, flagsMap map[string]string, + lazyIndexReaderEnabled bool, + lazyIndexReaderIdleTimeout time.Duration, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() @@ -304,6 +314,8 @@ func runStore( enablePostingsCompression, postingOffsetsInMemSampling, false, + lazyIndexReaderEnabled, + lazyIndexReaderIdleTimeout, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/docs/components/store.md b/docs/components/store.md index 938ca623e6..2ff77a9225 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -171,6 +171,10 @@ Flags: before being deleted from bucket. Default is 24h, half of the default value for --delete-delay on compactor. + --store.enable-index-header-lazy-reader + If true, Store Gateway will lazy memory map + index-header only once the block is required by + a query. --web.external-prefix="" Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 42c33767f7..20ae1c5bc9 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -637,8 +637,8 @@ func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) { }, nil } -func (r BinaryReader) IndexVersion() int { - return r.indexVersion +func (r BinaryReader) IndexVersion() (int, error) { + return r.indexVersion, nil } // TODO(bwplotka): Get advantage of multi value offset fetch. @@ -871,7 +871,7 @@ func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } -func (r BinaryReader) LabelNames() []string { +func (r BinaryReader) LabelNames() ([]string, error) { allPostingsKeyName, _ := index.AllPostingsKey() labelNames := make([]string, 0, len(r.postings)) for name := range r.postings { @@ -882,7 +882,7 @@ func (r BinaryReader) LabelNames() []string { labelNames = append(labelNames, name) } sort.Strings(labelNames) - return labelNames + return labelNames, nil } func (r *BinaryReader) Close() error { return r.c.Close() } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index dbbe335deb..657427bd6f 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -18,7 +18,7 @@ type Reader interface { io.Closer // IndexVersion returns version of index. - IndexVersion() int + IndexVersion() (int, error) // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. @@ -36,5 +36,5 @@ type Reader interface { LabelValues(name string) ([]string, error) // LabelNames returns all label names. - LabelNames() []string + LabelNames() ([]string, error) } diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 81fd0d970b..2614cc9d55 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -96,7 +96,7 @@ func TestReaders(t *testing.T) { b := realByteSlice(indexFile.Bytes()) - t.Run("binary", func(t *testing.T) { + t.Run("binary reader", func(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) @@ -168,6 +168,18 @@ func TestReaders(t *testing.T) { compareIndexToHeader(t, b, br) }) + + t.Run("lazy binary reader", func(t *testing.T) { + fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) + testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) + + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, br.Close()) }() + + compareIndexToHeader(t, b, br) + }) }) } @@ -178,7 +190,9 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe testutil.Ok(t, err) defer func() { _ = indexReader.Close() }() - testutil.Equals(t, indexReader.Version(), headerReader.IndexVersion()) + actVersion, err := headerReader.IndexVersion() + testutil.Ok(t, err) + testutil.Equals(t, indexReader.Version(), actVersion) if indexReader.Version() == index.FormatV2 { // For v2 symbols ref sequential integers 0, 1, 2 etc. @@ -211,7 +225,9 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe expLabelNames, err := indexReader.LabelNames() testutil.Ok(t, err) - testutil.Equals(t, expLabelNames, headerReader.LabelNames()) + actualLabelNames, err := headerReader.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, expLabelNames, actualLabelNames) expRanges, err := indexReader.PostingsRanges() testutil.Ok(t, err) diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go new file mode 100644 index 0000000000..e9b9dc20bd --- /dev/null +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -0,0 +1,273 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package indexheader + +import ( + "context" + "os" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/tsdb/index" + "go.uber.org/atomic" + + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" +) + +// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader. +type LazyBinaryReaderMetrics struct { + loadCount prometheus.Counter + loadFailedCount prometheus.Counter + unloadCount prometheus.Counter + unloadFailedCount prometheus.Counter + loadDuration prometheus.Histogram +} + +// NewLazyBinaryReaderMetrics makes new LazyBinaryReaderMetrics. +func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetrics { + return &LazyBinaryReaderMetrics{ + loadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "indexheader_lazy_load_total", + Help: "Total number of index-header lazy load operations.", + }), + loadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "indexheader_lazy_load_failed_total", + Help: "Total number of failed index-header lazy load operations.", + }), + unloadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "indexheader_lazy_unload_total", + Help: "Total number of index-header lazy unload operations.", + }), + unloadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "indexheader_lazy_unload_failed_total", + Help: "Total number of failed index-header lazy unload operations.", + }), + loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "indexheader_lazy_load_duration_seconds", + Help: "Duration of the index-header lazy loading in seconds.", + Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5}, + }), + } +} + +// LazyBinaryReader wraps BinaryReader and loads (mmap) the index-header only upon +// the first Reader function is called. +type LazyBinaryReader struct { + ctx context.Context + logger log.Logger + bkt objstore.BucketReader + dir string + filepath string + id ulid.ULID + postingOffsetsInMemSampling int + metrics *LazyBinaryReaderMetrics + onClosed func(*LazyBinaryReader) + + readerMx sync.RWMutex + reader *BinaryReader + readerErr error + + // Keep track of the last time it was used. + usedAt *atomic.Int64 +} + +// NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist +// on the local disk at dir location, this function will build it downloading required +// sections from the full index stored in the bucket. However, this function doesn't load +// (mmap) the index-header; it will be loaded at first Reader function call. +func NewLazyBinaryReader( + ctx context.Context, + logger log.Logger, + bkt objstore.BucketReader, + dir string, + id ulid.ULID, + postingOffsetsInMemSampling int, + metrics *LazyBinaryReaderMetrics, + onClosed func(*LazyBinaryReader), +) (*LazyBinaryReader, error) { + filepath := filepath.Join(dir, id.String(), block.IndexHeaderFilename) + + // If the index-header doesn't exist we should download it. + if _, err := os.Stat(filepath); err != nil { + if !os.IsNotExist(err) { + return nil, errors.Wrap(err, "read index header") + } + + level.Debug(logger).Log("msg", "the index-header doesn't exist on disk; recreating", "path", filepath) + + start := time.Now() + if err := WriteBinary(ctx, bkt, id, filepath); err != nil { + return nil, errors.Wrap(err, "write index header") + } + + level.Debug(logger).Log("msg", "built index-header file", "path", filepath, "elapsed", time.Since(start)) + } + + return &LazyBinaryReader{ + ctx: ctx, + logger: logger, + bkt: bkt, + dir: dir, + filepath: filepath, + id: id, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + metrics: metrics, + usedAt: atomic.NewInt64(time.Now().UnixNano()), + onClosed: onClosed, + }, nil +} + +// Close implements Reader. It unloads the index-header from memory (releasing the mmap +// area), but a subsequent call to any other Reader function will automatically reload it. +func (r *LazyBinaryReader) Close() error { + if r.onClosed != nil { + defer r.onClosed(r) + } + + return r.unload() +} + +// IndexVersion implements Reader. +func (r *LazyBinaryReader) IndexVersion() (int, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return 0, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.IndexVersion() +} + +// PostingsOffset implements Reader. +func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return index.Range{}, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffset(name, value) +} + +// LookupSymbol implements Reader. +func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return "", err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.LookupSymbol(o) +} + +// LabelValues implements Reader. +func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.LabelValues(name) +} + +// LabelNames implements Reader. +func (r *LazyBinaryReader) LabelNames() ([]string, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.load(); err != nil { + return nil, err + } + + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.LabelNames() +} + +// load ensures the underlying binary index-header reader has been successfully loaded. Returns +// an error on failure. This function MUST be called with the read lock already acquired. +func (r *LazyBinaryReader) load() error { + // Nothing to do if we already tried loading it. + if r.reader != nil { + return nil + } + if r.readerErr != nil { + return r.readerErr + } + + // Take the write lock to ensure we'll try to load it only once. Take again + // the read lock once done. + r.readerMx.RUnlock() + r.readerMx.Lock() + defer r.readerMx.RLock() + defer r.readerMx.Unlock() + + // Ensure none else tried to load it in the meanwhile. + if r.reader != nil { + return nil + } + if r.readerErr != nil { + return r.readerErr + } + + level.Debug(r.logger).Log("msg", "lazy loading index-header file", "path", r.filepath) + r.metrics.loadCount.Inc() + startTime := time.Now() + + reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling) + if err != nil { + r.metrics.loadFailedCount.Inc() + r.readerErr = err + return errors.Wrapf(err, "lazy load index-header file at %s", r.filepath) + } + + r.reader = reader + level.Debug(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime)) + r.metrics.loadDuration.Observe(time.Since(startTime).Seconds()) + + return nil +} + +// unload closes underlying BinaryReader. Calling this function on a already unloaded reader is a no-op. +func (r *LazyBinaryReader) unload() error { + // Always update the used timestamp so that the pool will not call unload() again until the next + // idle timeout is hit. + r.usedAt.Store(time.Now().UnixNano()) + + r.readerMx.Lock() + defer r.readerMx.Unlock() + + if r.reader == nil { + return nil + } + + r.metrics.unloadCount.Inc() + if err := r.reader.Close(); err != nil { + r.metrics.unloadFailedCount.Inc() + return err + } + + r.reader = nil + return nil +} + +func (r *LazyBinaryReader) lastUsedAt() int64 { + return r.usedAt.Load() +} diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go new file mode 100644 index 0000000000..07b8883cfd --- /dev/null +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -0,0 +1,170 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package indexheader + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore/filesystem" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" +) + +func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), nil) + testutil.NotOk(t, err) +} + +func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + // Should lazy load the index upon first usage. + v, err := r.IndexVersion() + testutil.Ok(t, err) + testutil.Equals(t, 2, v) + testutil.Assert(t, r.reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) +} + +func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + // Write a corrupted index-header for the block. + headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) + testutil.Ok(t, ioutil.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) + + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) +} + +func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Close it. + testutil.Ok(t, r.Close()) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Should lazy load again upon next usage. + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Closing an already closed lazy reader should be a no-op. + for i := 0; i < 2; i++ { + testutil.Ok(t, r.Close()) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + } +} diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go new file mode 100644 index 0000000000..660ae4853a --- /dev/null +++ b/pkg/block/indexheader/reader_pool.go @@ -0,0 +1,147 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package indexheader + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + + "github.com/thanos-io/thanos/pkg/objstore" +) + +// ReaderPool is used to istantiate new index-header readers and keep track of them. +// When the lazy reader is enabled, the pool keeps track of all instantiated readers +// and automatically close them once the idle timeout is reached. A closed lazy reader +// will be automatically re-opened upon next usage. +type ReaderPool struct { + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + lazyReaderMetrics *LazyBinaryReaderMetrics + logger log.Logger + + // Channel used to signal once the pool is closing. + close chan struct{} + + // Keep track of all readers managed by the pool. + lazyReadersMx sync.Mutex + lazyReaders map[*LazyBinaryReader]struct{} +} + +// NewReaderPool makes a new ReaderPool. +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, reg prometheus.Registerer) *ReaderPool { + p := &ReaderPool{ + logger: logger, + lazyReaderEnabled: lazyReaderEnabled, + lazyReaderIdleTimeout: lazyReaderIdleTimeout, + lazyReaderMetrics: NewLazyBinaryReaderMetrics(reg), + lazyReaders: make(map[*LazyBinaryReader]struct{}), + close: make(chan struct{}), + } + + // Start a goroutine to close idle readers (only if required). + if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { + checkFreq := p.lazyReaderIdleTimeout / 10 + + go func() { + for { + select { + case <-p.close: + return + case <-time.After(checkFreq): + p.closeIdleReaders() + } + } + }() + } + + return p +} + +// NewBinaryReader creates and returns a new binary reader. If the pool has been configured +// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader +// is tracked by the pool and automatically closed once the idle timeout expires. +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { + var reader Reader + var err error + + if p.lazyReaderEnabled { + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.lazyReaderMetrics, p.onLazyReaderClosed) + } else { + reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) + } + + if err != nil { + return nil, err + } + + // Keep track of lazy readers only if required. + if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { + p.lazyReadersMx.Lock() + p.lazyReaders[reader.(*LazyBinaryReader)] = struct{}{} + p.lazyReadersMx.Unlock() + } + + return reader, err +} + +// Close the pool and stop checking for idle readers. No reader tracked by this pool +// will be closed. It's the caller responsibility to close readers. +func (p *ReaderPool) Close() { + close(p.close) +} + +func (p *ReaderPool) closeIdleReaders() { + for _, r := range p.getIdleReaders() { + // Closing an already closed reader is a no-op, so we close it and just update + // the last timestamp on success. If it will be still be idle the next time this + // function is called, we'll try to close it again and will just be a no-op. + // + // Due to concurrency, the current implementation may close a reader which was + // use between when the list of idle readers has been computed and now. This is + // an edge case we're willing to accept, to not further complicate the logic. + if err := r.unload(); err != nil { + level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err) + } + } +} + +func (p *ReaderPool) getIdleReaders() []*LazyBinaryReader { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + var idle []*LazyBinaryReader + threshold := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano() + + for r := range p.lazyReaders { + if r.lastUsedAt() < threshold { + idle = append(idle, r) + } + } + + return idle +} + +func (p *ReaderPool) isTracking(r *LazyBinaryReader) bool { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + _, ok := p.lazyReaders[r] + return ok +} + +func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + // When this function is called, it means the reader has been closed NOT because was idle + // but because the consumer closed it. By contract, a reader closed by the consumer can't + // be used anymore, so we can automatically remove it from the pool. + delete(p.lazyReaders, r) +} diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go new file mode 100644 index 0000000000..74bd319bd9 --- /dev/null +++ b/pkg/block/indexheader/reader_pool_test.go @@ -0,0 +1,133 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package indexheader + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore/filesystem" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" +) + +func TestReaderPool_NewBinaryReader(t *testing.T) { + tests := map[string]struct { + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + }{ + "lazy reader is disabled": { + lazyReaderEnabled: false, + }, + "lazy reader is enabled but close on idle timeout is disabled": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: 0, + }, + "lazy reader and close on idle timeout are both enabled": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + }, + } + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, nil) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + }) + } +} + +func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { + const idleTimeout = time.Second + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, nil) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + + // Wait enough time before checking it. + time.Sleep(idleTimeout * 2) + + // We expect the reader has been closed, but not released from the pool. + testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + + // Ensure it can still read data (will be re-opened). + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + + // We expect an explicit call to Close() to close the reader and release it from the pool too. + testutil.Ok(t, r.Close()) + testutil.Assert(t, !pool.isTracking(r.(*LazyBinaryReader))) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 68d929e7f0..6bcead6b9b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -40,6 +40,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" @@ -247,13 +248,14 @@ type FilterConfig struct { // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. type BucketStore struct { - logger log.Logger - metrics *bucketStoreMetrics - bkt objstore.InstrumentedBucketReader - fetcher block.MetadataFetcher - dir string - indexCache storecache.IndexCache - chunkPool pool.BytesPool + logger log.Logger + metrics *bucketStoreMetrics + bkt objstore.InstrumentedBucketReader + fetcher block.MetadataFetcher + dir string + indexCache storecache.IndexCache + indexReaderPool *indexheader.ReaderPool + chunkPool pool.BytesPool // Sets of blocks that have the same labels. They are indexed by a hash over their label set. mtx sync.RWMutex @@ -305,6 +307,8 @@ func NewBucketStore( enablePostingsCompression bool, postingOffsetsInMemSampling int, enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. + lazyIndexReaderEnabled bool, + lazyIndexReaderIdleTimeout time.Duration, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -321,6 +325,7 @@ func NewBucketStore( fetcher: fetcher, dir: dir, indexCache: indexCache, + indexReaderPool: indexheader.NewReaderPool(logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, extprom.WrapRegistererWithPrefix("thanos_bucket_store_", reg)), chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, @@ -352,6 +357,8 @@ func (s *BucketStore) Close() (err error) { for _, b := range s.blocks { runutil.CloseWithErrCapture(&err, b, "closing Bucket Block") } + + s.indexReaderPool.Close() return err } @@ -484,7 +491,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er lset := labels.FromMap(meta.Thanos.Labels) h := lset.Hash() - indexHeaderReader, err := indexheader.NewBinaryReader( + indexHeaderReader, err := s.indexReaderPool.NewBinaryReader( ctx, s.logger, s.bkt, @@ -1075,7 +1082,11 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") // Do it via index reader to have pending reader registered correctly. - res := indexr.block.indexHeaderReader.LabelNames() + res, err := indexr.block.indexHeaderReader.LabelNames() + if err != nil { + return errors.Wrap(err, "label names") + } + sort.Strings(res) mtx.Lock() @@ -1555,7 +1566,11 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - if r.block.indexHeaderReader.IndexVersion() >= 2 { + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return nil, errors.Wrap(err, "get index version") + } + if version >= 2 { for i, id := range ps { ps[i] = id * 16 } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6f01f89635..afe359eed4 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -171,8 +171,12 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, DefaultPostingOffsetInMemorySampling, true, + true, + time.Minute, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, store.Close()) }() + s.store = store if manyParts { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index b692e6067b..30f7d5dd85 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -581,8 +581,11 @@ func TestBucketStore_Info(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, false, + false, + 0, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, bucketStore.Close()) }() resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) testutil.Ok(t, err) @@ -830,8 +833,11 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul true, DefaultPostingOffsetInMemorySampling, false, + false, + 0, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, bucketStore.Close()) }() testutil.Ok(t, bucketStore.InitialSync(context.Background())) @@ -1257,10 +1263,11 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer } store := &BucketStore{ - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: noopCache{}, - metrics: newBucketStoreMetrics(nil), + bkt: objstore.WithNoopInstr(bkt), + logger: logger, + indexCache: noopCache{}, + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, nil), + metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}}, }, @@ -1468,10 +1475,11 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } store := &BucketStore{ - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: indexCache, - metrics: newBucketStoreMetrics(nil), + bkt: objstore.WithNoopInstr(bkt), + logger: logger, + indexCache: indexCache, + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, nil), + metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, }, @@ -1607,8 +1615,12 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(tb, err) + defer func() { testutil.Ok(t, store.Close()) }() + testutil.Ok(tb, store.SyncBlocks(context.Background())) testCases := []*storetestutil.SeriesCase{ @@ -1716,8 +1728,12 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(tb, err) + defer func() { testutil.Ok(t, store.Close()) }() + testutil.Ok(tb, store.SyncBlocks(context.Background())) // Create a request with invalid hints (uses response hints instead of request hints). @@ -1806,6 +1822,8 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) @@ -1949,8 +1967,12 @@ func TestBlockWithLargeChunks(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, store.Close()) }() + testutil.Ok(t, store.SyncBlocks(context.Background())) req := &storepb.SeriesRequest{