From c6f8c5729b1f0cfe3f533ea17a8ab6571c8b53e1 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 11 Nov 2020 13:10:39 +0100 Subject: [PATCH 01/12] Experimental lazy index-header reader Signed-off-by: Marco Pracucci --- pkg/block/indexheader/binary_reader.go | 8 +- pkg/block/indexheader/header.go | 4 +- pkg/block/indexheader/header_test.go | 8 +- pkg/block/indexheader/lazy_binary_reader.go | 177 ++++++++++++++++++++ pkg/store/bucket.go | 15 +- 5 files changed, 201 insertions(+), 11 deletions(-) create mode 100644 pkg/block/indexheader/lazy_binary_reader.go diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 42c33767f7..20ae1c5bc9 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -637,8 +637,8 @@ func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) { }, nil } -func (r BinaryReader) IndexVersion() int { - return r.indexVersion +func (r BinaryReader) IndexVersion() (int, error) { + return r.indexVersion, nil } // TODO(bwplotka): Get advantage of multi value offset fetch. @@ -871,7 +871,7 @@ func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } -func (r BinaryReader) LabelNames() []string { +func (r BinaryReader) LabelNames() ([]string, error) { allPostingsKeyName, _ := index.AllPostingsKey() labelNames := make([]string, 0, len(r.postings)) for name := range r.postings { @@ -882,7 +882,7 @@ func (r BinaryReader) LabelNames() []string { labelNames = append(labelNames, name) } sort.Strings(labelNames) - return labelNames + return labelNames, nil } func (r *BinaryReader) Close() error { return r.c.Close() } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index dbbe335deb..657427bd6f 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -18,7 +18,7 @@ type Reader interface { io.Closer // IndexVersion returns version of index. - IndexVersion() int + IndexVersion() (int, error) // PostingsOffset returns start and end offsets of postings for given name and value. // The end offset might be bigger than the actual posting ending, but not larger than the whole index file. @@ -36,5 +36,5 @@ type Reader interface { LabelValues(name string) ([]string, error) // LabelNames returns all label names. - LabelNames() []string + LabelNames() ([]string, error) } diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 81fd0d970b..3e406a4b61 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -178,7 +178,9 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe testutil.Ok(t, err) defer func() { _ = indexReader.Close() }() - testutil.Equals(t, indexReader.Version(), headerReader.IndexVersion()) + actVersion, err := headerReader.IndexVersion() + testutil.Ok(t, err) + testutil.Equals(t, indexReader.Version(), actVersion) if indexReader.Version() == index.FormatV2 { // For v2 symbols ref sequential integers 0, 1, 2 etc. @@ -211,7 +213,9 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe expLabelNames, err := indexReader.LabelNames() testutil.Ok(t, err) - testutil.Equals(t, expLabelNames, headerReader.LabelNames()) + actualLabelNames, err := headerReader.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, expLabelNames, actualLabelNames) expRanges, err := indexReader.PostingsRanges() testutil.Ok(t, err) diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go new file mode 100644 index 0000000000..5a7b84af9f --- /dev/null +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -0,0 +1,177 @@ +package indexheader + +import ( + "context" + "os" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type LazyBinaryReader struct { + ctx context.Context + logger log.Logger + bkt objstore.BucketReader + dir string + filepath string + id ulid.ULID + postingOffsetsInMemSampling int + + readerMx sync.RWMutex + reader *BinaryReader + readerErr error +} + +func NewLazyBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*LazyBinaryReader, error) { + filepath := filepath.Join(dir, id.String(), block.IndexHeaderFilename) + + // If the index-header doesn't exist we should download it. + if _, err := os.Stat(filepath); err != nil { + if !os.IsNotExist(err) { + return nil, errors.Wrap(err, "read index header") + } + + level.Debug(logger).Log("msg", "the index-header doesn't exist on disk; recreating", "path", filepath) + + start := time.Now() + if err := WriteBinary(ctx, bkt, id, filepath); err != nil { + return nil, errors.Wrap(err, "write index header") + } + + level.Debug(logger).Log("msg", "built index-header file", "path", filepath, "elapsed", time.Since(start)) + } + + return &LazyBinaryReader{ + ctx: ctx, + logger: logger, + bkt: bkt, + dir: dir, + filepath: filepath, + id: id, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + }, nil +} + +func (r *LazyBinaryReader) Close() error { + r.readerMx.Lock() + defer r.readerMx.Unlock() + + if r.reader == nil { + return nil + } + + if err := r.reader.Close(); err != nil { + return err + } + + r.reader = nil + return nil +} + +func (r *LazyBinaryReader) IndexVersion() (int, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.open(); err != nil { + return 0, err + } + + return r.reader.IndexVersion() +} + +func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Range, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.open(); err != nil { + return index.Range{}, err + } + + return r.reader.PostingsOffset(name, value) +} + +func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.open(); err != nil { + return "", err + } + + return r.reader.LookupSymbol(o) +} + +func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.open(); err != nil { + return nil, err + } + + return r.reader.LabelValues(name) +} + +func (r *LazyBinaryReader) LabelNames() ([]string, error) { + r.readerMx.RLock() + defer r.readerMx.RUnlock() + + if err := r.open(); err != nil { + return nil, err + } + + return r.reader.LabelNames() +} + +// open ensure the underlying binary index-header reader has been successfully opened. Returns +// an error on failure. This function MUST be called with the read lock already acquired. +func (r *LazyBinaryReader) open() error { + // Nothing to do if it's already opened. + if r.reader != nil { + return nil + } + + // If we already tried to open it and it failed, we don't retry again + // and we always return the same error. + if r.readerErr != nil { + return r.readerErr + } + + // Take the write lock to ensure we'll try to open it only once. Take again + // the read lock once done. + r.readerMx.RUnlock() + r.readerMx.Lock() + defer r.readerMx.RLock() + defer r.readerMx.Unlock() + + // Ensure none else tried to open it in the meanwhile. + if r.reader != nil { + return nil + } else if r.readerErr != nil { + return r.readerErr + } + + level.Debug(r.logger).Log("msg", "lazy loading index-header file", "path", r.filepath) + startTime := time.Now() + + reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling) + if err != nil { + level.Error(r.logger).Log("msg", "failed to lazy load index-header file", "path", r.filepath, "err", err) + r.readerErr = err + return err + } + + r.reader = reader + level.Info(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime)) + + return nil +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 68d929e7f0..414f23693f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -484,7 +484,8 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er lset := labels.FromMap(meta.Thanos.Labels) h := lset.Hash() - indexHeaderReader, err := indexheader.NewBinaryReader( + // TODO enable it conditionally (via hidden CLI flag) + indexHeaderReader, err := indexheader.NewLazyBinaryReader( ctx, s.logger, s.bkt, @@ -1075,7 +1076,11 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") // Do it via index reader to have pending reader registered correctly. - res := indexr.block.indexHeaderReader.LabelNames() + res, err := indexr.block.indexHeaderReader.LabelNames() + if err != nil { + return errors.Wrap(err, "label names") + } + sort.Strings(res) mtx.Lock() @@ -1555,7 +1560,11 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er // As of version two all series entries are 16 byte padded. All references // we get have to account for that to get the correct offset. - if r.block.indexHeaderReader.IndexVersion() >= 2 { + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return nil, errors.Wrap(err, "get index version") + } + if version >= 2 { for i, id := range ps { ps[i] = id * 16 } From 611ccbc29064ed6b395f3c34b0d1e99ad90ea45a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 11 Nov 2020 17:35:17 +0100 Subject: [PATCH 02/12] Implemented lazy index-header reader Signed-off-by: Marco Pracucci --- cmd/thanos/store.go | 12 ++ pkg/block/indexheader/header_test.go | 14 +- pkg/block/indexheader/lazy_binary_reader.go | 8 +- .../indexheader/lazy_binary_reader_test.go | 143 ++++++++++++++ pkg/block/indexheader/reader_pool.go | 181 ++++++++++++++++++ pkg/block/indexheader/reader_pool_test.go | 125 ++++++++++++ pkg/store/bucket.go | 23 ++- pkg/store/bucket_e2e_test.go | 2 + pkg/store/bucket_test.go | 28 ++- 9 files changed, 512 insertions(+), 24 deletions(-) create mode 100644 pkg/block/indexheader/lazy_binary_reader_test.go create mode 100644 pkg/block/indexheader/reader_pool.go create mode 100644 pkg/block/indexheader/reader_pool_test.go diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 17c1339182..408d815361 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -107,6 +107,12 @@ func registerStore(app *extkingpin.App) { "Default is 24h, half of the default value for --delete-delay on compactor."). Default("24h")) + lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-loading", "If true, Store Gateway will lazy memory map index-header only once required by a query."). + Hidden().Default("false").Bool() + + lazyIndexReaderIdleTimeout := cmd.Flag("store.lazy-index-header-idle-timeout", "If index-header lazy loading is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). + Hidden().Default("5m").Duration() + webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String() @@ -152,6 +158,8 @@ func registerStore(app *extkingpin.App) { *postingOffsetsInMemSampling, cachingBucketConfig, getFlagsMap(cmd.Flags()), + *lazyIndexReaderEnabled, + *lazyIndexReaderIdleTimeout, ) }) } @@ -184,6 +192,8 @@ func runStore( postingOffsetsInMemSampling int, cachingBucketConfig *extflag.PathOrContent, flagsMap map[string]string, + lazyIndexReaderEnabled bool, + lazyIndexReaderIdleTimeout time.Duration, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() @@ -304,6 +314,8 @@ func runStore( enablePostingsCompression, postingOffsetsInMemSampling, false, + lazyIndexReaderEnabled, + lazyIndexReaderIdleTimeout, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 3e406a4b61..e6f1d4fd5c 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -96,7 +96,7 @@ func TestReaders(t *testing.T) { b := realByteSlice(indexFile.Bytes()) - t.Run("binary", func(t *testing.T) { + t.Run("binary reader", func(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) @@ -168,6 +168,18 @@ func TestReaders(t *testing.T) { compareIndexToHeader(t, b, br) }) + + t.Run("lazy binary reader", func(t *testing.T) { + fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) + testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) + + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, br.Close()) }() + + compareIndexToHeader(t, b, br) + }) }) } diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 5a7b84af9f..cf26c78f1c 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -135,14 +135,10 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) { // open ensure the underlying binary index-header reader has been successfully opened. Returns // an error on failure. This function MUST be called with the read lock already acquired. func (r *LazyBinaryReader) open() error { - // Nothing to do if it's already opened. + // Nothing to do if we already tried opening it. if r.reader != nil { return nil - } - - // If we already tried to open it and it failed, we don't retry again - // and we always return the same error. - if r.readerErr != nil { + } else if r.readerErr != nil { return r.readerErr } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go new file mode 100644 index 0000000000..5391733085 --- /dev/null +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -0,0 +1,143 @@ +package indexheader + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore/filesystem" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" +) + +func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3) + testutil.NotOk(t, err) +} + +func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + v, err := r.IndexVersion() + testutil.Ok(t, err) + testutil.Equals(t, 2, v) + testutil.Assert(t, r.reader != nil) + + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) +} + +func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + // Write a corrupted index-header for the block. + headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) + ioutil.WriteFile(headerFilename, []byte("xxx"), os.ModePerm) + + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) +} + +func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + + // Close it. + testutil.Ok(t, r.Close()) + testutil.Assert(t, r.reader == nil) + + // Should lazy load again upon next usage. + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + + // Closing an already closed lazy reader should be a no-op. + for i := 0; i < 2; i++ { + testutil.Ok(t, r.Close()) + } +} diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go new file mode 100644 index 0000000000..89e9cd58f8 --- /dev/null +++ b/pkg/block/indexheader/reader_pool.go @@ -0,0 +1,181 @@ +package indexheader + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb/index" + "go.uber.org/atomic" + + "github.com/thanos-io/thanos/pkg/objstore" +) + +type ReaderPool struct { + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + logger log.Logger + + // Channel used to signal once the pool is closing. + close chan struct{} + + // Keep track of all readers managed by the pool. + readersMx sync.Mutex + readers map[*readerTracker]struct{} +} + +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration) *ReaderPool { + p := &ReaderPool{ + logger: logger, + lazyReaderEnabled: lazyReaderEnabled, + lazyReaderIdleTimeout: lazyReaderIdleTimeout, + readers: make(map[*readerTracker]struct{}), + close: make(chan struct{}), + } + + // Start a goroutine to close idle readers (only if required). + if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { + checkFreq := p.lazyReaderIdleTimeout / 10 + + go func() { + for { + select { + case <-p.close: + return + case <-time.After(checkFreq): + p.closeIdleReaders() + } + } + }() + } + + return p +} + +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { + var reader Reader + var err error + + if p.lazyReaderEnabled { + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) + } else { + reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) + } + + if err != nil { + return nil, err + } + + // Keep track of lazy readers only if required. + if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { + reader = &readerTracker{ + reader: reader, + pool: p, + usedAt: atomic.NewInt64(time.Now().UnixNano()), + } + + p.readersMx.Lock() + p.readers[reader.(*readerTracker)] = struct{}{} + p.readersMx.Unlock() + } + + return reader, err +} + +// Close the pool and stop checking for idle readers. No reader tracked by this pool +// will be closed. It's the caller responsability to close readers. +func (p *ReaderPool) Close() { + close(p.close) +} + +func (p *ReaderPool) closeIdleReaders() { + for _, r := range p.getIdleReaders() { + // Closing an already closed reader is a no-op, so we close it and just update + // the last timestamp on success. If it will be still be idle the next time this + // function is called, we'll try to close it again and will just be a no-op. + // + // Due to concurrency, the current implementation may close a reader which was + // use between when the list of idle readers has been computed and now. This is + // an edge case we're willing to accept, to not further complicate the logic. + if err := r.reader.Close(); err != nil { + level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err) + } + + // Update the used timestamp so that we'll not call Close() again until the next + // idle timeout is hit. + r.usedAt.Store(time.Now().UnixNano()) + } +} + +func (p *ReaderPool) getIdleReaders() []*readerTracker { + p.readersMx.Lock() + defer p.readersMx.Unlock() + + var idle []*readerTracker + threshold := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano() + + for r := range p.readers { + if r.usedAt.Load() < threshold { + idle = append(idle, r) + } + } + + return idle +} + +func (p *ReaderPool) isTracking(r *readerTracker) bool { + p.readersMx.Lock() + defer p.readersMx.Unlock() + + _, ok := p.readers[r] + return ok +} + +func (p *ReaderPool) onReaderClosed(r *readerTracker) { + p.readersMx.Lock() + defer p.readersMx.Unlock() + + // When this function is called, it means the reader has been closed NOT because was idle + // but because the consumer closed it. By contract, a reader closed by the consumer can't + // be used anymore, so we can automatically remove it from the pool. + delete(p.readers, r) +} + +type readerTracker struct { + reader Reader + pool *ReaderPool + usedAt *atomic.Int64 +} + +func (r *readerTracker) Close() error { + r.pool.onReaderClosed(r) + return r.reader.Close() +} + +func (r *readerTracker) IndexVersion() (int, error) { + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.IndexVersion() +} + +func (r *readerTracker) PostingsOffset(name string, value string) (index.Range, error) { + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.PostingsOffset(name, value) +} + +func (r *readerTracker) LookupSymbol(o uint32) (string, error) { + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.LookupSymbol(o) +} + +func (r *readerTracker) LabelValues(name string) ([]string, error) { + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.LabelValues(name) +} + +func (r *readerTracker) LabelNames() ([]string, error) { + r.usedAt.Store(time.Now().UnixNano()) + return r.reader.LabelNames() +} diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go new file mode 100644 index 0000000000..c3d8962a2c --- /dev/null +++ b/pkg/block/indexheader/reader_pool_test.go @@ -0,0 +1,125 @@ +package indexheader + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore/filesystem" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" +) + +func TestReaderPool_NewBinaryReader(t *testing.T) { + tests := map[string]struct { + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + }{ + "lazy reader is disabled": { + lazyReaderEnabled: false, + }, + "lazy reader is enabled but close on idle timeout is disabled": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: 0, + }, + "lazy reader and close on idle timeout are both enabled": { + lazyReaderEnabled: true, + lazyReaderIdleTimeout: time.Minute, + }, + } + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + }) + } +} + +func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { + const idleTimeout = time.Second + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader != nil) + + // Wait enough time before checking it. + time.Sleep(idleTimeout * 2) + + // We expect the reader has been closed, but not released from the pool. + testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader == nil) + testutil.Assert(t, pool.isTracking(r.(*readerTracker))) + + // Ensure it can still read data (will be re-opened). + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader != nil) + testutil.Assert(t, pool.isTracking(r.(*readerTracker))) + + // We expect an explicit call to Close() to close the reader and release it from the pool too. + testutil.Ok(t, r.Close()) + testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader == nil) + testutil.Assert(t, !pool.isTracking(r.(*readerTracker))) +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 414f23693f..8e2f06f604 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -247,13 +247,14 @@ type FilterConfig struct { // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. type BucketStore struct { - logger log.Logger - metrics *bucketStoreMetrics - bkt objstore.InstrumentedBucketReader - fetcher block.MetadataFetcher - dir string - indexCache storecache.IndexCache - chunkPool pool.BytesPool + logger log.Logger + metrics *bucketStoreMetrics + bkt objstore.InstrumentedBucketReader + fetcher block.MetadataFetcher + dir string + indexCache storecache.IndexCache + indexReaderPool *indexheader.ReaderPool + chunkPool pool.BytesPool // Sets of blocks that have the same labels. They are indexed by a hash over their label set. mtx sync.RWMutex @@ -305,6 +306,8 @@ func NewBucketStore( enablePostingsCompression bool, postingOffsetsInMemSampling int, enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. + lazyIndexReaderEnabled bool, + lazyIndexReaderIdleTimeout time.Duration, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -321,6 +324,7 @@ func NewBucketStore( fetcher: fetcher, dir: dir, indexCache: indexCache, + indexReaderPool: indexheader.NewReaderPool(logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout), chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, @@ -352,6 +356,8 @@ func (s *BucketStore) Close() (err error) { for _, b := range s.blocks { runutil.CloseWithErrCapture(&err, b, "closing Bucket Block") } + + s.indexReaderPool.Close() return err } @@ -484,8 +490,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er lset := labels.FromMap(meta.Thanos.Labels) h := lset.Hash() - // TODO enable it conditionally (via hidden CLI flag) - indexHeaderReader, err := indexheader.NewLazyBinaryReader( + indexHeaderReader, err := s.indexReaderPool.NewBinaryReader( ctx, s.logger, s.bkt, diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6f01f89635..a92cfbdd5c 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -171,6 +171,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, DefaultPostingOffsetInMemorySampling, true, + true, + time.Minute, ) testutil.Ok(t, err) s.store = store diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index b692e6067b..ffad133bbd 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -581,6 +581,8 @@ func TestBucketStore_Info(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, false, + false, + 0, ) testutil.Ok(t, err) @@ -830,6 +832,8 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul true, DefaultPostingOffsetInMemorySampling, false, + false, + 0, ) testutil.Ok(t, err) @@ -1257,10 +1261,11 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer } store := &BucketStore{ - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: noopCache{}, - metrics: newBucketStoreMetrics(nil), + bkt: objstore.WithNoopInstr(bkt), + logger: logger, + indexCache: noopCache{}, + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0), + metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}}, }, @@ -1468,10 +1473,11 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } store := &BucketStore{ - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: indexCache, - metrics: newBucketStoreMetrics(nil), + bkt: objstore.WithNoopInstr(bkt), + logger: logger, + indexCache: indexCache, + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0), + metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, }, @@ -1607,6 +1613,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) @@ -1716,6 +1724,8 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) @@ -1949,6 +1959,8 @@ func TestBlockWithLargeChunks(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(t, err) testutil.Ok(t, store.SyncBlocks(context.Background())) From dae657045f3b80296b04bc6279e4f0fd9eef86e2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 11 Nov 2020 17:36:18 +0100 Subject: [PATCH 03/12] Renamed CLI flags Signed-off-by: Marco Pracucci --- cmd/thanos/store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 408d815361..524e9b60c4 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -107,10 +107,10 @@ func registerStore(app *extkingpin.App) { "Default is 24h, half of the default value for --delete-delay on compactor."). Default("24h")) - lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-loading", "If true, Store Gateway will lazy memory map index-header only once required by a query."). + lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once required by a query."). Hidden().Default("false").Bool() - lazyIndexReaderIdleTimeout := cmd.Flag("store.lazy-index-header-idle-timeout", "If index-header lazy loading is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). + lazyIndexReaderIdleTimeout := cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").Duration() webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() From ad2d3bb7b95ce7dbc553bccb45b8ac7136bdbd21 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 11 Nov 2020 17:37:32 +0100 Subject: [PATCH 04/12] Added copyright to new files Signed-off-by: Marco Pracucci --- pkg/block/indexheader/lazy_binary_reader.go | 3 +++ pkg/block/indexheader/lazy_binary_reader_test.go | 3 +++ pkg/block/indexheader/reader_pool.go | 3 +++ pkg/block/indexheader/reader_pool_test.go | 3 +++ 4 files changed, 12 insertions(+) diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index cf26c78f1c..0cceb1b2bf 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package indexheader import ( diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 5391733085..152d3e0798 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package indexheader import ( diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 89e9cd58f8..03127787f8 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package indexheader import ( diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index c3d8962a2c..83e6fdc4f7 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -1,3 +1,6 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + package indexheader import ( From f4f51b1c24c6dd149e91b5c579b48c6bca08c905 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 10:26:59 +0100 Subject: [PATCH 05/12] Track metrics in the lazy index-header reader Signed-off-by: Marco Pracucci --- pkg/block/indexheader/header_test.go | 2 +- pkg/block/indexheader/lazy_binary_reader.go | 54 ++++++++++++++++++- .../indexheader/lazy_binary_reader_test.go | 34 ++++++++++-- pkg/block/indexheader/reader_pool.go | 10 ++-- pkg/block/indexheader/reader_pool_test.go | 17 +++--- pkg/store/bucket.go | 3 +- pkg/store/bucket_test.go | 4 +- 7 files changed, 105 insertions(+), 19 deletions(-) diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index e6f1d4fd5c..1d6197f4be 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -173,7 +173,7 @@ func TestReaders(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) - br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3) + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil)) testutil.Ok(t, err) defer func() { testutil.Ok(t, br.Close()) }() diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 0cceb1b2bf..649dccc46a 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -14,12 +14,48 @@ import ( "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/objstore" ) +type LazyBinaryReaderMetrics struct { + loadCount prometheus.Counter + loadFailedCount prometheus.Counter + unloadCount prometheus.Counter + unloadFailedCount prometheus.Counter + loadDuration prometheus.Histogram +} + +func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetrics { + return &LazyBinaryReaderMetrics{ + loadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "binary_reader_lazy_load_total", + Help: "Total number of index-header lazy load operations.", + }), + loadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "binary_reader_lazy_load_failed_total", + Help: "Total number of failed index-header lazy load operations.", + }), + unloadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "binary_reader_lazy_unload_total", + Help: "Total number of index-header lazy unload operations.", + }), + unloadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "binary_reader_lazy_unload_failed_total", + Help: "Total number of failed index-header lazy unload operations.", + }), + loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "binary_reader_lazy_load_duration_seconds", + Help: "Duration of the index-header lazy loading in seconds.", + Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5}, + }), + } +} + type LazyBinaryReader struct { ctx context.Context logger log.Logger @@ -28,13 +64,22 @@ type LazyBinaryReader struct { filepath string id ulid.ULID postingOffsetsInMemSampling int + metrics *LazyBinaryReaderMetrics readerMx sync.RWMutex reader *BinaryReader readerErr error } -func NewLazyBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*LazyBinaryReader, error) { +func NewLazyBinaryReader( + ctx context.Context, + logger log.Logger, + bkt objstore.BucketReader, + dir string, + id ulid.ULID, + postingOffsetsInMemSampling int, + metrics *LazyBinaryReaderMetrics, +) (*LazyBinaryReader, error) { filepath := filepath.Join(dir, id.String(), block.IndexHeaderFilename) // If the index-header doesn't exist we should download it. @@ -61,6 +106,7 @@ func NewLazyBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bu filepath: filepath, id: id, postingOffsetsInMemSampling: postingOffsetsInMemSampling, + metrics: metrics, }, nil } @@ -72,11 +118,14 @@ func (r *LazyBinaryReader) Close() error { return nil } + r.metrics.unloadCount.Inc() if err := r.reader.Close(); err != nil { + r.metrics.unloadFailedCount.Inc() return err } r.reader = nil + return nil } @@ -160,17 +209,20 @@ func (r *LazyBinaryReader) open() error { } level.Debug(r.logger).Log("msg", "lazy loading index-header file", "path", r.filepath) + r.metrics.loadCount.Inc() startTime := time.Now() reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling) if err != nil { level.Error(r.logger).Log("msg", "failed to lazy load index-header file", "path", r.filepath, "err", err) + r.metrics.loadFailedCount.Inc() r.readerErr = err return err } r.reader = reader level.Info(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime)) + r.metrics.loadDuration.Observe(time.Since(startTime).Seconds()) return nil } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 152d3e0798..2ceb0c4a1e 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block" @@ -31,7 +32,7 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil)) testutil.NotOk(t, err) } @@ -54,19 +55,26 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) // Should lazy load the index upon first usage. v, err := r.IndexVersion() testutil.Ok(t, err) testutil.Equals(t, 2, v) testutil.Assert(t, r.reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) labelNames, err := r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) } func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { @@ -90,16 +98,23 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { // Write a corrupted index-header for the block. headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) - ioutil.WriteFile(headerFilename, []byte("xxx"), os.ModePerm) + testutil.Ok(t, ioutil.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) // Ensure it can read data. labelNames, err := r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) } func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { @@ -121,7 +136,8 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) @@ -129,18 +145,26 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { labelNames, err := r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) // Close it. testutil.Ok(t, r.Close()) testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) // Should lazy load again upon next usage. labelNames, err = r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) // Closing an already closed lazy reader should be a no-op. for i := 0; i < 2; i++ { testutil.Ok(t, r.Close()) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) } } diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 03127787f8..6e6c881293 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -11,15 +11,18 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/index" "go.uber.org/atomic" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" ) type ReaderPool struct { lazyReaderEnabled bool lazyReaderIdleTimeout time.Duration + lazyReaderMetrics *LazyBinaryReaderMetrics logger log.Logger // Channel used to signal once the pool is closing. @@ -30,11 +33,12 @@ type ReaderPool struct { readers map[*readerTracker]struct{} } -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, reg prometheus.Registerer) *ReaderPool { p := &ReaderPool{ logger: logger, lazyReaderEnabled: lazyReaderEnabled, lazyReaderIdleTimeout: lazyReaderIdleTimeout, + lazyReaderMetrics: NewLazyBinaryReaderMetrics(extprom.WrapRegistererWithPrefix("indexheader_pool_", reg)), readers: make(map[*readerTracker]struct{}), close: make(chan struct{}), } @@ -63,7 +67,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.lazyReaderMetrics) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) } @@ -89,7 +93,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt } // Close the pool and stop checking for idle readers. No reader tracked by this pool -// will be closed. It's the caller responsability to close readers. +// will be closed. It's the caller responsibility to close readers. func (p *ReaderPool) Close() { close(p.close) } diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 83e6fdc4f7..cd47906d7a 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/go-kit/kit/log" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block" @@ -58,7 +59,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, nil) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) @@ -94,7 +95,7 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, nil) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) @@ -105,24 +106,28 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { labelNames, err := r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) - testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) // Wait enough time before checking it. time.Sleep(idleTimeout * 2) // We expect the reader has been closed, but not released from the pool. - testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader == nil) testutil.Assert(t, pool.isTracking(r.(*readerTracker))) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) // Ensure it can still read data (will be re-opened). labelNames, err = r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) - testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader != nil) testutil.Assert(t, pool.isTracking(r.(*readerTracker))) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) // We expect an explicit call to Close() to close the reader and release it from the pool too. testutil.Ok(t, r.Close()) - testutil.Assert(t, r.(*readerTracker).reader.(*LazyBinaryReader).reader == nil) testutil.Assert(t, !pool.isTracking(r.(*readerTracker))) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 8e2f06f604..6bcead6b9b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -40,6 +40,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" @@ -324,7 +325,7 @@ func NewBucketStore( fetcher: fetcher, dir: dir, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout), + indexReaderPool: indexheader.NewReaderPool(logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, extprom.WrapRegistererWithPrefix("thanos_bucket_store_", reg)), chunkPool: chunkPool, blocks: map[ulid.ULID]*bucketBlock{}, blockSets: map[uint64]*bucketBlockSet{}, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ffad133bbd..989e0ea596 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1264,7 +1264,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: noopCache{}, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, nil), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}}, @@ -1476,7 +1476,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, nil), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, From b4be3548504da1063257db76d1a027f3379a53eb Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 10:34:08 +0100 Subject: [PATCH 06/12] Ensure BucketStore.Close() is called in tests Signed-off-by: Marco Pracucci --- pkg/store/bucket_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 989e0ea596..cdbf88d916 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -585,6 +585,7 @@ func TestBucketStore_Info(t *testing.T) { 0, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, bucketStore.Close()) }() resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) testutil.Ok(t, err) @@ -836,6 +837,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul 0, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, bucketStore.Close()) }() testutil.Ok(t, bucketStore.InitialSync(context.Background())) @@ -1617,6 +1619,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { 0, ) testutil.Ok(tb, err) + defer func() { testutil.Ok(t, store.Close()) }() + testutil.Ok(tb, store.SyncBlocks(context.Background())) testCases := []*storetestutil.SeriesCase{ @@ -1728,6 +1732,8 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { 0, ) testutil.Ok(tb, err) + defer func() { testutil.Ok(t, store.Close()) }() + testutil.Ok(tb, store.SyncBlocks(context.Background())) // Create a request with invalid hints (uses response hints instead of request hints). @@ -1963,6 +1969,8 @@ func TestBlockWithLargeChunks(t *testing.T) { 0, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, store.Close()) }() + testutil.Ok(t, store.SyncBlocks(context.Background())) req := &storepb.SeriesRequest{ From 2e7658addd87034c6d17fc42dc49a6659902b195 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 10:34:36 +0100 Subject: [PATCH 07/12] Ensure BucketStore.Close() is called in e2e tests too Signed-off-by: Marco Pracucci --- pkg/store/bucket_e2e_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index a92cfbdd5c..afe359eed4 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -175,6 +175,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m time.Minute, ) testutil.Ok(t, err) + defer func() { testutil.Ok(t, store.Close()) }() + s.store = store if manyParts { From 79101736eeb3f6c9a8ead80c3a4d2467cf8c7b7c Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 12:52:02 +0100 Subject: [PATCH 08/12] Shorten metric names Signed-off-by: Marco Pracucci --- pkg/block/indexheader/lazy_binary_reader.go | 10 +++++----- pkg/block/indexheader/reader_pool.go | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 649dccc46a..e2aa9357d3 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -33,23 +33,23 @@ type LazyBinaryReaderMetrics struct { func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetrics { return &LazyBinaryReaderMetrics{ loadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "binary_reader_lazy_load_total", + Name: "indexheader_lazy_load_total", Help: "Total number of index-header lazy load operations.", }), loadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "binary_reader_lazy_load_failed_total", + Name: "indexheader_lazy_load_failed_total", Help: "Total number of failed index-header lazy load operations.", }), unloadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "binary_reader_lazy_unload_total", + Name: "indexheader_lazy_unload_total", Help: "Total number of index-header lazy unload operations.", }), unloadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "binary_reader_lazy_unload_failed_total", + Name: "indexheader_lazy_unload_failed_total", Help: "Total number of failed index-header lazy unload operations.", }), loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "binary_reader_lazy_load_duration_seconds", + Name: "indexheader_lazy_load_duration_seconds", Help: "Duration of the index-header lazy loading in seconds.", Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5}, }), diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 6e6c881293..35d7437f9b 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -15,7 +15,6 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "go.uber.org/atomic" - "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -38,7 +37,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime logger: logger, lazyReaderEnabled: lazyReaderEnabled, lazyReaderIdleTimeout: lazyReaderIdleTimeout, - lazyReaderMetrics: NewLazyBinaryReaderMetrics(extprom.WrapRegistererWithPrefix("indexheader_pool_", reg)), + lazyReaderMetrics: NewLazyBinaryReaderMetrics(reg), readers: make(map[*readerTracker]struct{}), close: make(chan struct{}), } From f3f0b42dfecf3f74e37f46c7c3ac169612efb12b Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 13:08:04 +0100 Subject: [PATCH 09/12] Added CHANGELOG entry Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd6cd80fcd..fae234fb92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data. - [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction. - [#3245](https://github.com/thanos-io/thanos/pull/3245) Query Frontend: Add `query-frontend.org-id-header` flag to specify HTTP header(s) to populate slow query log (e.g. X-Grafana-User). +- [#3431](https://github.com/thanos-io/thanos/pull/3431) Store: Added experimental support to lazy load index-headers at query time. When enabled via `--store.enable-index-header-lazy-reader=true` flag, the store-gateway will load into memory an index-header only once required at query time and will be automatically released after `--store.index-header-lazy-reader-idle-timeout` of inactivity. ### Fixed From bc167bd4b6189101828e95cf6c981eda4ae2ba27 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 17:26:34 +0100 Subject: [PATCH 10/12] Addressed review comments Signed-off-by: Marco Pracucci --- CHANGELOG.md | 3 ++- cmd/thanos/store.go | 4 ++-- docs/components/store.md | 4 ++++ pkg/block/indexheader/lazy_binary_reader.go | 26 +++++++++++++++++---- pkg/block/indexheader/reader_pool.go | 14 +++++++++++ 5 files changed, 43 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fae234fb92..db7aab31dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data. - [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction. - [#3245](https://github.com/thanos-io/thanos/pull/3245) Query Frontend: Add `query-frontend.org-id-header` flag to specify HTTP header(s) to populate slow query log (e.g. X-Grafana-User). -- [#3431](https://github.com/thanos-io/thanos/pull/3431) Store: Added experimental support to lazy load index-headers at query time. When enabled via `--store.enable-index-header-lazy-reader=true` flag, the store-gateway will load into memory an index-header only once required at query time and will be automatically released after `--store.index-header-lazy-reader-idle-timeout` of inactivity. +- [#3431](https://github.com/thanos-io/thanos/pull/3431) Store: Added experimental support to lazy load index-headers at query time. When enabled via `--store.enable-index-header-lazy-reader` flag, the store-gateway will load into memory an index-header only once it's required at query time. Index-header will be automatically released after `--store.index-header-lazy-reader-idle-timeout` of inactivity. + * This, generally, reduces baseline memory usage of store when inactive, as well as a total number of mapped files (which is limited to 64k in some systems. ### Fixed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 524e9b60c4..352d660414 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -107,8 +107,8 @@ func registerStore(app *extkingpin.App) { "Default is 24h, half of the default value for --delete-delay on compactor."). Default("24h")) - lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once required by a query."). - Hidden().Default("false").Bool() + lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query."). + Default("false").Bool() lazyIndexReaderIdleTimeout := cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").Duration() diff --git a/docs/components/store.md b/docs/components/store.md index 938ca623e6..2ff77a9225 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -171,6 +171,10 @@ Flags: before being deleted from bucket. Default is 24h, half of the default value for --delete-delay on compactor. + --store.enable-index-header-lazy-reader + If true, Store Gateway will lazy memory map + index-header only once the block is required by + a query. --web.external-prefix="" Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index e2aa9357d3..12b5ea7e5f 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" ) +// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader. type LazyBinaryReaderMetrics struct { loadCount prometheus.Counter loadFailedCount prometheus.Counter @@ -30,6 +31,7 @@ type LazyBinaryReaderMetrics struct { loadDuration prometheus.Histogram } +// NewLazyBinaryReaderMetrics makes new LazyBinaryReaderMetrics. func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetrics { return &LazyBinaryReaderMetrics{ loadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -56,6 +58,8 @@ func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetr } } +// LazyBinaryReader wraps BinaryReader and loads (mmap) the index-header only upon +// the first Reader function is called. type LazyBinaryReader struct { ctx context.Context logger log.Logger @@ -71,6 +75,10 @@ type LazyBinaryReader struct { readerErr error } +// NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist +// on the local disk at dir location, this function will build it downloading required +// sections from the full index stored in the bucket. However, this function doesn't load +// (mmap) the index-header; it will be loaded at first Reader function call. func NewLazyBinaryReader( ctx context.Context, logger log.Logger, @@ -110,6 +118,8 @@ func NewLazyBinaryReader( }, nil } +// Close implements Reader. It unloads the index-header from memory (releasing the mmap +// area), but a subsequent call to any other Reader function will automatically reload it. func (r *LazyBinaryReader) Close() error { r.readerMx.Lock() defer r.readerMx.Unlock() @@ -129,6 +139,7 @@ func (r *LazyBinaryReader) Close() error { return nil } +// IndexVersion implements Reader. func (r *LazyBinaryReader) IndexVersion() (int, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() @@ -140,6 +151,7 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { return r.reader.IndexVersion() } +// PostingsOffset implements Reader. func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Range, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() @@ -151,6 +163,7 @@ func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Rang return r.reader.PostingsOffset(name, value) } +// LookupSymbol implements Reader. func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() @@ -162,6 +175,7 @@ func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { return r.reader.LookupSymbol(o) } +// LabelValues implements Reader. func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() @@ -173,6 +187,7 @@ func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) { return r.reader.LabelValues(name) } +// LabelNames implements Reader. func (r *LazyBinaryReader) LabelNames() ([]string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() @@ -190,7 +205,8 @@ func (r *LazyBinaryReader) open() error { // Nothing to do if we already tried opening it. if r.reader != nil { return nil - } else if r.readerErr != nil { + } + if r.readerErr != nil { return r.readerErr } @@ -204,7 +220,8 @@ func (r *LazyBinaryReader) open() error { // Ensure none else tried to open it in the meanwhile. if r.reader != nil { return nil - } else if r.readerErr != nil { + } + if r.readerErr != nil { return r.readerErr } @@ -214,14 +231,13 @@ func (r *LazyBinaryReader) open() error { reader, err := NewBinaryReader(r.ctx, r.logger, r.bkt, r.dir, r.id, r.postingOffsetsInMemSampling) if err != nil { - level.Error(r.logger).Log("msg", "failed to lazy load index-header file", "path", r.filepath, "err", err) r.metrics.loadFailedCount.Inc() r.readerErr = err - return err + return errors.Wrapf(err, "lazy load index-header file at %s", r.filepath) } r.reader = reader - level.Info(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime)) + level.Debug(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime)) r.metrics.loadDuration.Observe(time.Since(startTime).Seconds()) return nil diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 35d7437f9b..e0b63aad08 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -18,6 +18,10 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" ) +// ReaderPool is used to istantiate new index-header readers and keep track of them. +// When the lazy reader is enabled, the pool keeps track of all instantiated readers +// and automatically close them once the idle timeout is reached. A closed lazy reader +// will be automatically re-opened upon next usage. type ReaderPool struct { lazyReaderEnabled bool lazyReaderIdleTimeout time.Duration @@ -32,6 +36,7 @@ type ReaderPool struct { readers map[*readerTracker]struct{} } +// NewReaderPool makes a new ReaderPool. func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, reg prometheus.Registerer) *ReaderPool { p := &ReaderPool{ logger: logger, @@ -61,6 +66,9 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime return p } +// NewBinaryReader creates and returns a new binary reader. If the pool has been configured +// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader +// is tracked by the pool and automatically closed once the idle timeout expires. func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { var reader Reader var err error @@ -156,31 +164,37 @@ type readerTracker struct { usedAt *atomic.Int64 } +// Close implements Reader. func (r *readerTracker) Close() error { r.pool.onReaderClosed(r) return r.reader.Close() } +// IndexVersion implements Reader. func (r *readerTracker) IndexVersion() (int, error) { r.usedAt.Store(time.Now().UnixNano()) return r.reader.IndexVersion() } +// PostingsOffset implements Reader. func (r *readerTracker) PostingsOffset(name string, value string) (index.Range, error) { r.usedAt.Store(time.Now().UnixNano()) return r.reader.PostingsOffset(name, value) } +// LookupSymbol implements Reader. func (r *readerTracker) LookupSymbol(o uint32) (string, error) { r.usedAt.Store(time.Now().UnixNano()) return r.reader.LookupSymbol(o) } +// LabelValues implements Reader. func (r *readerTracker) LabelValues(name string) ([]string, error) { r.usedAt.Store(time.Now().UnixNano()) return r.reader.LabelValues(name) } +// LabelNames implements Reader. func (r *readerTracker) LabelNames() ([]string, error) { r.usedAt.Store(time.Now().UnixNano()) return r.reader.LabelNames() From 5c97813055e2cfb3a7856531c284fb5d21cef70a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 17:46:47 +0100 Subject: [PATCH 11/12] Removed readerTracker Signed-off-by: Marco Pracucci --- pkg/block/indexheader/header_test.go | 2 +- pkg/block/indexheader/lazy_binary_reader.go | 78 ++++++++++----- .../indexheader/lazy_binary_reader_test.go | 8 +- pkg/block/indexheader/reader_pool.go | 98 +++++-------------- pkg/block/indexheader/reader_pool_test.go | 6 +- 5 files changed, 84 insertions(+), 108 deletions(-) diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 1d6197f4be..2614cc9d55 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -173,7 +173,7 @@ func TestReaders(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) - br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil)) + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil) testutil.Ok(t, err) defer func() { testutil.Ok(t, br.Close()) }() diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 12b5ea7e5f..5ffed3d16d 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb/index" + "go.uber.org/atomic" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/objstore" @@ -69,10 +70,14 @@ type LazyBinaryReader struct { id ulid.ULID postingOffsetsInMemSampling int metrics *LazyBinaryReaderMetrics + onClosed func(*LazyBinaryReader) readerMx sync.RWMutex reader *BinaryReader readerErr error + + // Keep track of the last time it was used. + usedAt *atomic.Int64 } // NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist @@ -87,6 +92,7 @@ func NewLazyBinaryReader( id ulid.ULID, postingOffsetsInMemSampling int, metrics *LazyBinaryReaderMetrics, + onClosed func(*LazyBinaryReader), ) (*LazyBinaryReader, error) { filepath := filepath.Join(dir, id.String(), block.IndexHeaderFilename) @@ -115,28 +121,20 @@ func NewLazyBinaryReader( id: id, postingOffsetsInMemSampling: postingOffsetsInMemSampling, metrics: metrics, + usedAt: atomic.NewInt64(time.Now().UnixNano()), + onClosed: onClosed, }, nil } // Close implements Reader. It unloads the index-header from memory (releasing the mmap // area), but a subsequent call to any other Reader function will automatically reload it. func (r *LazyBinaryReader) Close() error { - r.readerMx.Lock() - defer r.readerMx.Unlock() - - if r.reader == nil { - return nil - } - - r.metrics.unloadCount.Inc() - if err := r.reader.Close(); err != nil { - r.metrics.unloadFailedCount.Inc() - return err + // Invoke the callback after having released the lock. + if r.onClosed != nil { + defer r.onClosed(r) } - r.reader = nil - - return nil + return r.unload() } // IndexVersion implements Reader. @@ -144,10 +142,11 @@ func (r *LazyBinaryReader) IndexVersion() (int, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() - if err := r.open(); err != nil { + if err := r.load(); err != nil { return 0, err } + r.usedAt.Store(time.Now().UnixNano()) return r.reader.IndexVersion() } @@ -156,10 +155,11 @@ func (r *LazyBinaryReader) PostingsOffset(name string, value string) (index.Rang r.readerMx.RLock() defer r.readerMx.RUnlock() - if err := r.open(); err != nil { + if err := r.load(); err != nil { return index.Range{}, err } + r.usedAt.Store(time.Now().UnixNano()) return r.reader.PostingsOffset(name, value) } @@ -168,10 +168,11 @@ func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() - if err := r.open(); err != nil { + if err := r.load(); err != nil { return "", err } + r.usedAt.Store(time.Now().UnixNano()) return r.reader.LookupSymbol(o) } @@ -180,10 +181,11 @@ func (r *LazyBinaryReader) LabelValues(name string) ([]string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() - if err := r.open(); err != nil { + if err := r.load(); err != nil { return nil, err } + r.usedAt.Store(time.Now().UnixNano()) return r.reader.LabelValues(name) } @@ -192,17 +194,18 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() - if err := r.open(); err != nil { + if err := r.load(); err != nil { return nil, err } + r.usedAt.Store(time.Now().UnixNano()) return r.reader.LabelNames() } -// open ensure the underlying binary index-header reader has been successfully opened. Returns +// load ensure the underlying binary index-header reader has been successfully loaded. Returns // an error on failure. This function MUST be called with the read lock already acquired. -func (r *LazyBinaryReader) open() error { - // Nothing to do if we already tried opening it. +func (r *LazyBinaryReader) load() error { + // Nothing to do if we already tried loading it. if r.reader != nil { return nil } @@ -210,14 +213,14 @@ func (r *LazyBinaryReader) open() error { return r.readerErr } - // Take the write lock to ensure we'll try to open it only once. Take again + // Take the write lock to ensure we'll try to load it only once. Take again // the read lock once done. r.readerMx.RUnlock() r.readerMx.Lock() defer r.readerMx.RLock() defer r.readerMx.Unlock() - // Ensure none else tried to open it in the meanwhile. + // Ensure none else tried to load it in the meanwhile. if r.reader != nil { return nil } @@ -242,3 +245,30 @@ func (r *LazyBinaryReader) open() error { return nil } + +// unload closes underlying BinaryReader. Calling this function on a already unloaded reader is a no-op. +func (r *LazyBinaryReader) unload() error { + // Always update the used timestamp so that the pool will not call unload() again until the next + // idle timeout is hit. + r.usedAt.Store(time.Now().UnixNano()) + + r.readerMx.Lock() + defer r.readerMx.Unlock() + + if r.reader == nil { + return nil + } + + r.metrics.unloadCount.Inc() + if err := r.reader.Close(); err != nil { + r.metrics.unloadFailedCount.Inc() + return err + } + + r.reader = nil + return nil +} + +func (r *LazyBinaryReader) lastUsedAt() int64 { + return r.usedAt.Load() +} diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 2ceb0c4a1e..07b8883cfd 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -32,7 +32,7 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil)) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), nil) testutil.NotOk(t, err) } @@ -56,7 +56,7 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) @@ -101,7 +101,7 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { testutil.Ok(t, ioutil.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) @@ -137,7 +137,7 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) m := NewLazyBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) testutil.Ok(t, err) testutil.Assert(t, r.reader == nil) diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index e0b63aad08..660ae4853a 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -12,8 +12,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/tsdb/index" - "go.uber.org/atomic" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -32,8 +30,8 @@ type ReaderPool struct { close chan struct{} // Keep track of all readers managed by the pool. - readersMx sync.Mutex - readers map[*readerTracker]struct{} + lazyReadersMx sync.Mutex + lazyReaders map[*LazyBinaryReader]struct{} } // NewReaderPool makes a new ReaderPool. @@ -43,7 +41,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime lazyReaderEnabled: lazyReaderEnabled, lazyReaderIdleTimeout: lazyReaderIdleTimeout, lazyReaderMetrics: NewLazyBinaryReaderMetrics(reg), - readers: make(map[*readerTracker]struct{}), + lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), } @@ -74,7 +72,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.lazyReaderMetrics) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.lazyReaderMetrics, p.onLazyReaderClosed) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) } @@ -85,15 +83,9 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt // Keep track of lazy readers only if required. if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { - reader = &readerTracker{ - reader: reader, - pool: p, - usedAt: atomic.NewInt64(time.Now().UnixNano()), - } - - p.readersMx.Lock() - p.readers[reader.(*readerTracker)] = struct{}{} - p.readersMx.Unlock() + p.lazyReadersMx.Lock() + p.lazyReaders[reader.(*LazyBinaryReader)] = struct{}{} + p.lazyReadersMx.Unlock() } return reader, err @@ -114,25 +106,21 @@ func (p *ReaderPool) closeIdleReaders() { // Due to concurrency, the current implementation may close a reader which was // use between when the list of idle readers has been computed and now. This is // an edge case we're willing to accept, to not further complicate the logic. - if err := r.reader.Close(); err != nil { + if err := r.unload(); err != nil { level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err) } - - // Update the used timestamp so that we'll not call Close() again until the next - // idle timeout is hit. - r.usedAt.Store(time.Now().UnixNano()) } } -func (p *ReaderPool) getIdleReaders() []*readerTracker { - p.readersMx.Lock() - defer p.readersMx.Unlock() +func (p *ReaderPool) getIdleReaders() []*LazyBinaryReader { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() - var idle []*readerTracker + var idle []*LazyBinaryReader threshold := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano() - for r := range p.readers { - if r.usedAt.Load() < threshold { + for r := range p.lazyReaders { + if r.lastUsedAt() < threshold { idle = append(idle, r) } } @@ -140,62 +128,20 @@ func (p *ReaderPool) getIdleReaders() []*readerTracker { return idle } -func (p *ReaderPool) isTracking(r *readerTracker) bool { - p.readersMx.Lock() - defer p.readersMx.Unlock() +func (p *ReaderPool) isTracking(r *LazyBinaryReader) bool { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() - _, ok := p.readers[r] + _, ok := p.lazyReaders[r] return ok } -func (p *ReaderPool) onReaderClosed(r *readerTracker) { - p.readersMx.Lock() - defer p.readersMx.Unlock() +func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() // When this function is called, it means the reader has been closed NOT because was idle // but because the consumer closed it. By contract, a reader closed by the consumer can't // be used anymore, so we can automatically remove it from the pool. - delete(p.readers, r) -} - -type readerTracker struct { - reader Reader - pool *ReaderPool - usedAt *atomic.Int64 -} - -// Close implements Reader. -func (r *readerTracker) Close() error { - r.pool.onReaderClosed(r) - return r.reader.Close() -} - -// IndexVersion implements Reader. -func (r *readerTracker) IndexVersion() (int, error) { - r.usedAt.Store(time.Now().UnixNano()) - return r.reader.IndexVersion() -} - -// PostingsOffset implements Reader. -func (r *readerTracker) PostingsOffset(name string, value string) (index.Range, error) { - r.usedAt.Store(time.Now().UnixNano()) - return r.reader.PostingsOffset(name, value) -} - -// LookupSymbol implements Reader. -func (r *readerTracker) LookupSymbol(o uint32) (string, error) { - r.usedAt.Store(time.Now().UnixNano()) - return r.reader.LookupSymbol(o) -} - -// LabelValues implements Reader. -func (r *readerTracker) LabelValues(name string) ([]string, error) { - r.usedAt.Store(time.Now().UnixNano()) - return r.reader.LabelValues(name) -} - -// LabelNames implements Reader. -func (r *readerTracker) LabelNames() ([]string, error) { - r.usedAt.Store(time.Now().UnixNano()) - return r.reader.LabelNames() + delete(p.lazyReaders, r) } diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index cd47906d7a..74bd319bd9 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -113,7 +113,7 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { time.Sleep(idleTimeout * 2) // We expect the reader has been closed, but not released from the pool. - testutil.Assert(t, pool.isTracking(r.(*readerTracker))) + testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) @@ -121,13 +121,13 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { labelNames, err = r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) - testutil.Assert(t, pool.isTracking(r.(*readerTracker))) + testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) // We expect an explicit call to Close() to close the reader and release it from the pool too. testutil.Ok(t, r.Close()) - testutil.Assert(t, !pool.isTracking(r.(*readerTracker))) + testutil.Assert(t, !pool.isTracking(r.(*LazyBinaryReader))) testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) } From 5d266ad82712f2e2f78f80dd853f3b12b01efd4c Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 12 Nov 2020 17:55:13 +0100 Subject: [PATCH 12/12] Fixed test and comments Signed-off-by: Marco Pracucci --- pkg/block/indexheader/lazy_binary_reader.go | 3 +-- pkg/store/bucket_test.go | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 5ffed3d16d..e9b9dc20bd 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -129,7 +129,6 @@ func NewLazyBinaryReader( // Close implements Reader. It unloads the index-header from memory (releasing the mmap // area), but a subsequent call to any other Reader function will automatically reload it. func (r *LazyBinaryReader) Close() error { - // Invoke the callback after having released the lock. if r.onClosed != nil { defer r.onClosed(r) } @@ -202,7 +201,7 @@ func (r *LazyBinaryReader) LabelNames() ([]string, error) { return r.reader.LabelNames() } -// load ensure the underlying binary index-header reader has been successfully loaded. Returns +// load ensures the underlying binary index-header reader has been successfully loaded. Returns // an error on failure. This function MUST be called with the read lock already acquired. func (r *LazyBinaryReader) load() error { // Nothing to do if we already tried loading it. diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index cdbf88d916..30f7d5dd85 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1822,6 +1822,8 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { true, DefaultPostingOffsetInMemorySampling, true, + false, + 0, ) testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background()))