diff --git a/CHANGELOG.md b/CHANGELOG.md index a7b302e794d00..607f7e7014208 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [9621](https://github.com/grafana/loki/pull/9621) **DylanGuedes**: Introduce TSDB postings cache. * [10010](https://github.com/grafana/loki/pull/10010) **rasta-rocket**: feat(promtail): retrieve BotTags field from cloudflare * [9995](https://github.com/grafana/loki/pull/9995) **chaudum**: Add jitter to the flush interval to prevent multiple ingesters to flush at the same time. * [9797](https://github.com/grafana/loki/pull/9797) **chaudum**: Add new `loki_index_gateway_requests_total` counter metric to observe per-tenant RPS diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index fc78f4de82dba..7e6e3753e93a9 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1978,6 +1978,9 @@ boltdb_shipper: # CLI flag: -boltdb.shipper.build-per-tenant-index [build_per_tenant_index: | default = false] +# Configures storing index in an Object Store +# (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required +# fields only required when TSDB is defined in config. tsdb_shipper: # Directory where ingesters would write index files which would then be # uploaded by shipper to configured storage @@ -2037,6 +2040,11 @@ tsdb_shipper: [mode: | default = ""] [ingesterdbretainperiod: ] + + # Experimental. Whether TSDB should cache postings or not. The + # index-read-cache will be used as the backend. + # CLI flag: -tsdb.enable-postings-cache + [enable_postings_cache: | default = false] ``` ### chunk_store_config diff --git a/go.mod b/go.mod index 26b8133808769..4ff91592be05c 100644 --- a/go.mod +++ b/go.mod @@ -129,6 +129,7 @@ require ( golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/oauth2 v0.10.0 golang.org/x/text v0.11.0 + google.golang.org/protobuf v1.31.0 ) require ( @@ -309,7 +310,6 @@ require ( google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect - google.golang.org/protobuf v1.31.0 // indirect gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 0f0a3d58af8e7..dbc991ea359cc 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -2,11 +2,15 @@ package integration import ( "context" + "strings" "testing" "time" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "github.com/grafana/loki/integration/client" "github.com/grafana/loki/integration/cluster" @@ -567,3 +571,185 @@ func TestSchedulerRing(t *testing.T) { assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) }) } + +func TestQueryTSDB_WithCachedPostings(t *testing.T) { + clu := cluster.New(nil, cluster.SchemaWithTSDB) + + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + var ( + tDistributor = clu.AddComponent( + "distributor", + "-target=distributor", + ) + tIndexGateway = clu.AddComponent( + "index-gateway", + "-target=index-gateway", + "-tsdb.enable-postings-cache=true", + "-store.index-cache-read.cache.enable-fifocache=true", + ) + ) + require.NoError(t, clu.Run()) + + var ( + tIngester = clu.AddComponent( + "ingester", + "-target=ingester", + "-ingester.flush-on-shutdown=true", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + tQueryScheduler = clu.AddComponent( + "query-scheduler", + "-target=query-scheduler", + "-query-scheduler.use-scheduler-ring=false", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + tCompactor = clu.AddComponent( + "compactor", + "-target=compactor", + "-boltdb.shipper.compactor.compaction-interval=1s", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + ) + require.NoError(t, clu.Run()) + + // finally, run the query-frontend and querier. + var ( + tQueryFrontend = clu.AddComponent( + "query-frontend", + "-target=query-frontend", + "-frontend.scheduler-address="+tQueryScheduler.GRPCURL(), + "-frontend.default-validity=0s", + "-common.compactor-address="+tCompactor.HTTPURL(), + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + _ = clu.AddComponent( + "querier", + "-target=querier", + "-querier.scheduler-address="+tQueryScheduler.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + ) + require.NoError(t, clu.Run()) + + tenantID := randStringRunes() + + now := time.Now() + cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) + cliDistributor.Now = now + cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) + cliIngester.Now = now + cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) + cliQueryFrontend.Now = now + cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) + cliIndexGateway.Now = now + + // initial cache state. + igwMetrics, err := cliIndexGateway.Metrics() + require.NoError(t, err) + assertCacheState(t, igwMetrics, &expectedCacheState{ + cacheName: "store.index-cache-read.embedded-cache", + gets: 0, + misses: 0, + added: 0, + }) + + t.Run("ingest-logs", func(t *testing.T) { + require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-72*time.Hour), map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"})) + }) + + // restart ingester which should flush the chunks and index + require.NoError(t, tIngester.Restart()) + + // Query lines + t.Run("query to verify logs being served from storage", func(t *testing.T) { + resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) + require.NoError(t, err) + assert.Equal(t, "streams", resp.Data.ResultType) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + + assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines) + }) + + igwMetrics, err = cliIndexGateway.Metrics() + require.NoError(t, err) + assertCacheState(t, igwMetrics, &expectedCacheState{ + cacheName: "store.index-cache-read.embedded-cache", + gets: 50, + misses: 1, + added: 1, + }) + + // ingest logs with ts=now. + require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"})) + + // default length is 7 days. + resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) + require.NoError(t, err) + assert.Equal(t, "streams", resp.Data.ResultType) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + // expect lines from both, ingesters memory and from the store. + assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) + +} + +func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 { + for _, m := range mf.Metric { + if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) { + continue + } + + return m.Counter.GetValue() + } + + return 0 +} + +func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) { + var parser expfmt.TextParser + mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics)) + require.NoError(t, err) + + lbs := []*dto.LabelPair{ + { + Name: proto.String("cache"), + Value: proto.String(e.cacheName), + }, + } + + mf, found := mfs["querier_cache_added_new_total"] + require.True(t, found) + require.Equal(t, e.added, getValueFromMF(mf, lbs)) + + mf, found = mfs["querier_cache_gets_total"] + require.True(t, found) + require.Equal(t, e.gets, getValueFromMF(mf, lbs)) + + mf, found = mfs["querier_cache_misses_total"] + require.True(t, found) + require.Equal(t, e.misses, getValueFromMF(mf, lbs)) +} + +type expectedCacheState struct { + cacheName string + gets float64 + misses float64 + added float64 +} diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index a2c268f6ff63d..01403cbe375f4 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -36,6 +36,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" + "github.com/grafana/loki/pkg/storage/stores/tsdb" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -283,9 +284,9 @@ type Config struct { DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"` - MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` - BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` - TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper"` + MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` + BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` + TSDBShipperConfig tsdb.IndexCfg `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."` // Config for using AsyncStore when using async index stores like `boltdb-shipper`. // It is required for getting chunk ids of recently flushed chunks from the ingesters. diff --git a/pkg/storage/store.go b/pkg/storage/store.go index b573105446ed6..b9e5be9bf6a03 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -231,7 +231,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String())) if p.IndexType == config.TSDBType { - if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) { + if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig.Config) { // inject the index-gateway client into the index store gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger) if err != nil { @@ -272,7 +272,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan } indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(fmt.Sprintf("%s_%s", p.ObjectType, p.From.String()), s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits, - tableRange, backupIndexWriter, indexClientReg, indexClientLogger) + tableRange, backupIndexWriter, indexClientReg, indexClientLogger, s.indexReadCache) if err != nil { return nil, nil, nil, err } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index fc2f8be35e603..3412727f58416 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/shipper" + "github.com/grafana/loki/pkg/storage/stores/tsdb" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/marshal" "github.com/grafana/loki/pkg/validation" @@ -1005,7 +1006,7 @@ func TestStore_indexPrefixChange(t *testing.T) { cfg := Config{ FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")}, - TSDBShipperConfig: shipperConfig, + TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig}, NamedStores: NamedStores{ Filesystem: map[string]NamedFSConfig{ "named-store": {Directory: path.Join(tempDir, "named-store")}, @@ -1166,7 +1167,7 @@ func TestStore_MultiPeriod(t *testing.T) { BoltDBShipperConfig: shipper.Config{ Config: shipperConfig, }, - TSDBShipperConfig: shipperConfig, + TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig, CachePostings: false}, NamedStores: NamedStores{ Filesystem: map[string]NamedFSConfig{ "named-store": {Directory: path.Join(tempDir, "named-store")}, @@ -1479,7 +1480,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { cfg := Config{ FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")}, BoltDBShipperConfig: boltdbShipperConfig, - TSDBShipperConfig: tsdbShipperConfig, + TSDBShipperConfig: tsdb.IndexCfg{Config: tsdbShipperConfig}, } schemaConfig := config.SchemaConfig{ diff --git a/pkg/storage/stores/tsdb/cached_postings_index.go b/pkg/storage/stores/tsdb/cached_postings_index.go new file mode 100644 index 0000000000000..ec5c99ebda20c --- /dev/null +++ b/pkg/storage/stores/tsdb/cached_postings_index.go @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/postings_codec.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Thanos Authors. + +package tsdb + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" +) + +type PostingsReader interface { + ForPostings(ctx context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error +} + +// NewCachedPostingsReader uses the cache defined by `index_read_cache` to store and read Postings. +// +// The cache key is stored/read as `matchers:reader_checksum`. +// +// The cache value is stored as: `[n, refs...]`, where n is how many series references this entry has, and refs is +// a sequence of series references encoded as the diff between the current series and the previous one. +// +// Example: if the postings for stream "app=kubernetes,env=production" is `[1,7,30,50]` and its reader has `checksum=12345`: +// - The cache key for the entry will be: `app=kubernetes,env=production:12345` +// - The cache value for the entry will be: [4, 1, 6, 23, 20]. +func NewCachedPostingsReader(reader IndexReader, logger log.Logger, cacheClient cache.Cache) PostingsReader { + return &cachedPostingsReader{ + reader: reader, + cacheClient: cacheClient, + log: logger, + } +} + +type cachedPostingsReader struct { + reader IndexReader + + cacheClient cache.Cache + + log log.Logger +} + +func (c *cachedPostingsReader) ForPostings(ctx context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error { + checksum := c.reader.Checksum() + key := fmt.Sprintf("%s:%d", CanonicalLabelMatchersKey(matchers), checksum) + if postings, got := c.fetchPostings(ctx, key); got { + return fn(postings) + } + + p, err := PostingsForMatchers(c.reader, nil, matchers...) + if err != nil { + return fmt.Errorf("failed to evaluate postings for matchers: %w", err) + } + + expandedPosts, err := index.ExpandPostings(p) + if err != nil { + return fmt.Errorf("failed to expand postings: %w", err) + } + + if err := c.storePostings(ctx, expandedPosts, key); err != nil { + level.Error(c.log).Log("msg", "failed to cache postings", "err", err, "matchers", key) + } + + // because `index.ExpandPostings` walks the iterator, we have to reset it current index by instantiating a new ListPostings. + return fn(index.NewListPostings(expandedPosts)) +} + +func (c *cachedPostingsReader) storePostings(ctx context.Context, expandedPostings []storage.SeriesRef, canonicalMatchers string) error { + buf, err := diffVarintSnappyEncode(index.NewListPostings(expandedPostings), len(expandedPostings)) + if err != nil { + return fmt.Errorf("couldn't encode postings: %w", err) + } + + return c.cacheClient.Store(ctx, []string{canonicalMatchers}, [][]byte{buf}) +} + +func (c *cachedPostingsReader) fetchPostings(ctx context.Context, key string) (index.Postings, bool) { + found, bufs, _, err := c.cacheClient.Fetch(ctx, []string{key}) + + if err != nil { + level.Error(c.log).Log("msg", "error on fetching postings", "err", err, "matchers", key) + return nil, false + } + + if len(found) > 0 { + // we only use a single key so we only care about index=0. + p, err := decodeToPostings(bufs[0]) + if err != nil { + level.Error(c.log).Log("msg", "failed to fetch postings", "err", err) + return nil, false + } + return p, true + } + + return nil, false +} + +func decodeToPostings(b []byte) (index.Postings, error) { + p, err := diffVarintSnappyDecode(b) + if err != nil { + return nil, fmt.Errorf("couldn't decode postings: %w", err) + } + + return p, nil +} + +// CanonicalLabelMatchersKey creates a canonical version of LabelMatchersKey +func CanonicalLabelMatchersKey(ms []*labels.Matcher) string { + sorted := make([]labels.Matcher, len(ms)) + for i := range ms { + sorted[i] = labels.Matcher{Type: ms[i].Type, Name: ms[i].Name, Value: ms[i].Value} + } + sort.Sort(sorteableLabelMatchers(sorted)) + + const ( + typeLen = 2 + sepLen = 1 + ) + var size int + for _, m := range sorted { + size += len(m.Name) + len(m.Value) + typeLen + sepLen + } + sb := strings.Builder{} + sb.Grow(size) + for _, m := range sorted { + sb.WriteString(m.Name) + sb.WriteString(m.Type.String()) + sb.WriteString(m.Value) + sb.WriteByte(',') + } + return sb.String() +} + +type sorteableLabelMatchers []labels.Matcher + +func (c sorteableLabelMatchers) Less(i, j int) bool { + if c[i].Name != c[j].Name { + return c[i].Name < c[j].Name + } + if c[i].Type != c[j].Type { + return c[i].Type < c[j].Type + } + return c[i].Value < c[j].Value +} + +func (c sorteableLabelMatchers) Len() int { return len(c) } +func (c sorteableLabelMatchers) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/pkg/storage/stores/tsdb/cached_postings_index_test.go b/pkg/storage/stores/tsdb/cached_postings_index_test.go new file mode 100644 index 0000000000000..18ab43deccdef --- /dev/null +++ b/pkg/storage/stores/tsdb/cached_postings_index_test.go @@ -0,0 +1,505 @@ +package tsdb + +import ( + "context" + "math/rand" + "sort" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" +) + +func TestSingleIdxCached(t *testing.T) { + // setup cache. + cfg := cache.FifoCacheConfig{MaxSizeBytes: "1MB"} + c := cache.NewFifoCache("test-cache", cfg, nil, log.NewNopLogger(), "test") + defer c.Stop() + + cases := []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 3, + Checksum: 0, + }, + { + MinTime: 1, + MaxTime: 4, + Checksum: 1, + }, + { + MinTime: 2, + MaxTime: 5, + Checksum: 2, + }, + }, + }, + { + Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 1, + MaxTime: 10, + Checksum: 3, + }, + }, + }, + { + Labels: mustParseLabels(`{foo="bard", bazz="bozz", bonk="borb"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 1, + MaxTime: 7, + Checksum: 4, + }, + }, + }, + } + + for _, variant := range []struct { + desc string + fn func() Index + }{ + { + desc: "file", + fn: func() Index { + return BuildIndex(t, t.TempDir(), cases, IndexOpts{PostingsCache: c}) + }, + }, + { + desc: "head", + fn: func() Index { + head := NewHead("fake", NewMetrics(nil), log.NewNopLogger()) + for _, x := range cases { + _, _ = head.Append(x.Labels, x.Labels.Hash(), x.Chunks) + } + reader := head.Index() + return NewTSDBIndex(reader, NewPostingsReader(reader)) + }, + }, + } { + t.Run(variant.desc, func(t *testing.T) { + idx := variant.fn() + t.Run("GetChunkRefs", func(t *testing.T) { + var err error + refs := make([]ChunkRef, 0, 8) + refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Nil(t, err) + + expected := []ChunkRef{ + { + User: "fake", + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), + Start: 0, + End: 3, + Checksum: 0, + }, + { + User: "fake", + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), + Start: 1, + End: 4, + Checksum: 1, + }, + { + User: "fake", + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), + Start: 2, + End: 5, + Checksum: 2, + }, + { + User: "fake", + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash()), + Start: 1, + End: 10, + Checksum: 3, + }, + } + require.Equal(t, expected, refs) + }) + + t.Run("GetChunkRefsSharded", func(t *testing.T) { + shard := index.ShardAnnotation{ + Shard: 1, + Of: 2, + } + var err error + refs := make([]ChunkRef, 0, 8) + refs, err = idx.GetChunkRefs(context.Background(), "fake", 1, 5, refs, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + + require.Nil(t, err) + + require.Equal(t, []ChunkRef{{ + User: "fake", + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash()), + Start: 1, + End: 10, + Checksum: 3, + }}, refs) + + }) + + t.Run("Series", func(t *testing.T) { + xs, err := idx.Series(context.Background(), "fake", 8, 9, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Nil(t, err) + + expected := []Series{ + { + Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash()), + }, + } + require.Equal(t, expected, xs) + }) + + t.Run("SeriesSharded", func(t *testing.T) { + shard := index.ShardAnnotation{ + Shard: 0, + Of: 2, + } + + xs, err := idx.Series(context.Background(), "fake", 0, 10, nil, &shard, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Nil(t, err) + + expected := []Series{ + { + Labels: mustParseLabels(`{foo="bar"}`), + Fingerprint: model.Fingerprint(mustParseLabels(`{foo="bar"}`).Hash()), + }, + } + require.Equal(t, expected, xs) + }) + + t.Run("LabelNames", func(t *testing.T) { + // request data at the end of the tsdb range, but it should return all labels present + ls, err := idx.LabelNames(context.Background(), "fake", 9, 10) + require.Nil(t, err) + sort.Strings(ls) + require.Equal(t, []string{"bazz", "bonk", "foo"}, ls) + }) + + t.Run("LabelNamesWithMatchers", func(t *testing.T) { + // request data at the end of the tsdb range, but it should return all labels present + ls, err := idx.LabelNames(context.Background(), "fake", 9, 10, labels.MustNewMatcher(labels.MatchEqual, "bazz", "buzz")) + require.Nil(t, err) + sort.Strings(ls) + require.Equal(t, []string{"bazz", "foo"}, ls) + }) + + t.Run("LabelValues", func(t *testing.T) { + vs, err := idx.LabelValues(context.Background(), "fake", 9, 10, "foo") + require.Nil(t, err) + sort.Strings(vs) + require.Equal(t, []string{"bar", "bard"}, vs) + }) + + t.Run("LabelValuesWithMatchers", func(t *testing.T) { + vs, err := idx.LabelValues(context.Background(), "fake", 9, 10, "foo", labels.MustNewMatcher(labels.MatchEqual, "bazz", "buzz")) + require.Nil(t, err) + require.Equal(t, []string{"bar"}, vs) + }) + + }) + } + +} + +func BenchmarkCacheableTSDBIndex_GetChunkRefs(b *testing.B) { + // setup cache. + cfg := cache.FifoCacheConfig{MaxSizeBytes: "1MB"} + c := cache.NewFifoCache("test-cache", cfg, nil, log.NewNopLogger(), "test") + defer c.Stop() + + now := model.Now() + queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond) + queryBounds := newBounds(queryFrom, queryThrough) + numChunksToMatch := 0 + + var chunkMetas []index.ChunkMeta + // build a chunk for every second with randomized chunk length + for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) { + // randomize chunk length between 1-120 mins + chunkLenMin := rand.Intn(120) + if chunkLenMin == 0 { + chunkLenMin = 1 + } + chunkMeta := index.ChunkMeta{ + MinTime: int64(from), + MaxTime: int64(from.Add(time.Duration(chunkLenMin) * time.Minute)), + Checksum: uint32(from), + Entries: 1, + } + chunkMetas = append(chunkMetas, chunkMeta) + if Overlap(chunkMeta, queryBounds) { + numChunksToMatch++ + } + } + + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: chunkMetas, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: chunkMetas, + }, + { + Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`), + Chunks: chunkMetas, + }, + }, IndexOpts{PostingsCache: c}) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + chkRefs := ChunkRefsPool.Get() + chkRefs, _ = tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, chkRefs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ChunkRefsPool.Put(chkRefs) + } +} + +func TestCacheableTSDBIndex_Stats(t *testing.T) { + // setup cache. + cfg := cache.FifoCacheConfig{MaxSizeBytes: "1MB"} + c := cache.NewFifoCache("test-cache", cfg, nil, log.NewNopLogger(), "test") + defer c.Stop() + + series := []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: 1, + Entries: 10, + KB: 10, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: 2, + Entries: 20, + KB: 20, + }, + }, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: 3, + Entries: 30, + KB: 30, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: 4, + Entries: 40, + KB: 40, + }, + }, + }, + } + + // Create the TSDB index + tempDir := t.TempDir() + + // Create the test cases + testCases := []struct { + name string + from model.Time + through model.Time + expected stats.Stats + expectedErr error + }{ + { + name: "from at the beginning of one chunk and through at the end of another chunk", + from: 0, + through: 20, + expected: stats.Stats{ + Streams: 2, + Chunks: 4, + Bytes: (10 + 20 + 30 + 40) * 1024, + Entries: 10 + 20 + 30 + 40, + }, + }, + { + name: "from inside one chunk and through inside another chunk", + from: 5, + through: 15, + expected: stats.Stats{ + Streams: 2, + Chunks: 4, + Bytes: (10*0.5 + 20*0.5 + 30*0.5 + 40*0.5) * 1024, + Entries: 10*0.5 + 20*0.5 + 30*0.5 + 40*0.5, + }, + }, + { + name: "from inside one chunk and through at the end of another chunk", + from: 5, + through: 20, + expected: stats.Stats{ + Streams: 2, + Chunks: 4, + Bytes: (10*0.5 + 20 + 30*0.5 + 40) * 1024, + Entries: 10*0.5 + 20 + 30*0.5 + 40, + }, + }, + { + name: "from at the beginning of one chunk and through inside another chunk", + from: 0, + through: 15, + expected: stats.Stats{ + Streams: 2, + Chunks: 4, + Bytes: (10 + 20*0.5 + 30 + 40*0.5) * 1024, + Entries: 10 + 20*0.5 + 30 + 40*0.5, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{PostingsCache: c}) + acc := &stats.Stats{} + err := tsdbIndex.Stats(context.Background(), "fake", tc.from, tc.through, acc, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, tc.expected, *acc) + }) + } +} + +func BenchmarkSeriesRepetitive(b *testing.B) { + // setup cache. + cfg := cache.FifoCacheConfig{MaxSizeBytes: "1MB"} + c := cache.NewFifoCache("test-cache", cfg, nil, log.NewNopLogger(), "test") + defer c.Stop() + + series := []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: 1, + Entries: 10, + KB: 10, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: 2, + Entries: 20, + KB: 20, + }, + }, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: 3, + Entries: 30, + KB: 30, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: 4, + Entries: 40, + KB: 40, + }, + }, + }, + } + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, series, IndexOpts{PostingsCache: c}) + acc := &stats.Stats{} + + for i := 0; i < b.N; i++ { + tsdbIndex.Stats(context.Background(), "fake", 5, 15, acc, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck + } +} + +func TestMultipleIndexesFiles(t *testing.T) { + // setup cache. + cfg := cache.FifoCacheConfig{MaxSizeBytes: "1MB"} + c := cache.NewFifoCache("test-cache", cfg, nil, log.NewNopLogger(), "test") + defer c.Stop() + + series := []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: 1, + Entries: 10, + KB: 10, + }, + }, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: 3, + Entries: 30, + KB: 30, + }, + }, + }, + } + tempDir := t.TempDir() + tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{PostingsCache: c}) + + refs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", 5, 10, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck + require.NoError(t, err) + require.Len(t, refs, 2) + + // repeat the same index, it hits the cache. + refs, err = tsdbIndex.GetChunkRefs(context.Background(), "fake", 5, 10, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck + require.NoError(t, err) + require.Len(t, refs, 2) + + // completely change the index now + series = []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: []index.ChunkMeta{}, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: []index.ChunkMeta{}, + }, + } + + tempDir = t.TempDir() + tsdbIndex = BuildIndex(t, tempDir, series, IndexOpts{PostingsCache: c}) + refs, err = tsdbIndex.GetChunkRefs(context.Background(), "fake", 5, 10, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) //nolint:errcheck + require.NoError(t, err) + require.Len(t, refs, 0) +} diff --git a/pkg/storage/stores/tsdb/compactor.go b/pkg/storage/stores/tsdb/compactor.go index 4530c295e05b3..4b19983e870ad 100644 --- a/pkg/storage/stores/tsdb/compactor.go +++ b/pkg/storage/stores/tsdb/compactor.go @@ -36,7 +36,7 @@ func (i indexProcessor) NewTableCompactor(ctx context.Context, commonIndexSet co } func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableName, userID, workingDir string, periodConfig config.PeriodConfig, logger log.Logger) (compactor.CompactedIndex, error) { - indexFile, err := OpenShippableTSDB(path) + indexFile, err := OpenShippableTSDB(path, IndexOpts{}) if err != nil { return nil, err } @@ -100,7 +100,7 @@ func (t *tableCompactor) CompactTable() error { } downloadPaths[job] = downloadedAt - idx, err := OpenShippableTSDB(downloadedAt) + idx, err := OpenShippableTSDB(downloadedAt, IndexOpts{}) if err != nil { return err } @@ -218,7 +218,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I } }() - indexFile, err := OpenShippableTSDB(path) + indexFile, err := OpenShippableTSDB(path, IndexOpts{}) if err != nil { return nil, err } @@ -387,7 +387,7 @@ func (c *compactedIndex) ToIndexFile() (index_shipper.Index, error) { return nil, err } - return NewShippableTSDBFile(id) + return NewShippableTSDBFile(id, IndexOpts{}) } func getUnsafeBytes(s string) []byte { diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 9128a0a301d7a..c1932c27fdc82 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -730,7 +730,8 @@ func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx return } - idx = NewTSDBIndex(tenant.indexRange(int64(from), int64(through))) + reader := tenant.indexRange(int64(from), int64(through)) + idx = NewTSDBIndex(reader, NewPostingsReader(reader)) if t.chunkFilter != nil { idx.SetChunkFilterer(t.chunkFilter) } diff --git a/pkg/storage/stores/tsdb/head_manager_test.go b/pkg/storage/stores/tsdb/head_manager_test.go index c90299d5f14dd..d9a715452f321 100644 --- a/pkg/storage/stores/tsdb/head_manager_test.go +++ b/pkg/storage/stores/tsdb/head_manager_test.go @@ -479,7 +479,7 @@ func TestBuildLegacyWALs(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - store, stop, err := NewStore(tc.store, shipperCfg, schemaCfg, nil, fsObjectClient, &zeroValueLimits{}, tc.tableRange, nil, nil, log.NewNopLogger()) + store, stop, err := NewStore(tc.store, IndexCfg{Config: shipperCfg}, schemaCfg, nil, fsObjectClient, &zeroValueLimits{}, tc.tableRange, nil, nil, log.NewNopLogger(), nil) require.Nil(t, err) refs, err := store.GetChunkRefs( diff --git a/pkg/storage/stores/tsdb/index.go b/pkg/storage/stores/tsdb/index.go index 814d7478603aa..74e6259143521 100644 --- a/pkg/storage/stores/tsdb/index.go +++ b/pkg/storage/stores/tsdb/index.go @@ -2,11 +2,13 @@ package tsdb import ( "context" + "flag" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -22,6 +24,18 @@ type ChunkRef struct { Checksum uint32 } +type IndexCfg struct { + indexshipper.Config `yaml:",inline"` + + CachePostings bool `yaml:"enable_postings_cache" category:"experimental"` +} + +func (cfg *IndexCfg) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.CachePostings, prefix+"enable-postings-cache", false, "Experimental. Whether TSDB should cache postings or not. The index-read-cache will be used as the backend.") + + cfg.Config.RegisterFlagsWithPrefix(prefix, f) +} + // Compares by (Start, End) // Assumes User is equivalent func (r ChunkRef) Less(x ChunkRef) bool { diff --git a/pkg/storage/stores/tsdb/index_client_test.go b/pkg/storage/stores/tsdb/index_client_test.go index 582a6b130943a..d7ef5ea6f6205 100644 --- a/pkg/storage/stores/tsdb/index_client_test.go +++ b/pkg/storage/stores/tsdb/index_client_test.go @@ -60,7 +60,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) { Labels: mustParseLabels(`{foo="bar"}`), Chunks: buildChunkMetas(int64(indexStartToday), int64(indexStartToday+99)), }, - }), + }, IndexOpts{}), }, tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): { @@ -69,7 +69,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) { Labels: mustParseLabels(`{foo="bar"}`), Chunks: buildChunkMetas(int64(indexStartYesterday), int64(indexStartYesterday+99)), }, - }), + }, IndexOpts{}), }, } @@ -118,7 +118,7 @@ func TestIndexClient_Stats(t *testing.T) { Labels: mustParseLabels(`{fizz="buzz"}`), Chunks: buildChunkMetas(int64(indexStartToday), int64(indexStartToday+99), 10), }, - }), + }, IndexOpts{}), }, tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): { @@ -135,7 +135,7 @@ func TestIndexClient_Stats(t *testing.T) { Labels: mustParseLabels(`{ping="pong"}`), Chunks: buildChunkMetas(int64(indexStartYesterday), int64(indexStartYesterday+99), 10), }, - }), + }, IndexOpts{}), }, } @@ -246,7 +246,7 @@ func TestIndexClient_Volume(t *testing.T) { Labels: mustParseLabels(`{fizz="buzz"}`), Chunks: buildChunkMetas(int64(indexStartToday), int64(indexStartToday+99), 10), }, - }), + }, IndexOpts{}), }, tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): { @@ -263,7 +263,7 @@ func TestIndexClient_Volume(t *testing.T) { Labels: mustParseLabels(`{ping="pong"}`), Chunks: buildChunkMetas(int64(indexStartYesterday), int64(indexStartYesterday+99), 10), }, - }), + }, IndexOpts{}), }, } diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index 9332b44e02e4e..bbe25d7f73569 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -129,7 +129,7 @@ func (m *tsdbManager) Start() (err error) { indices++ prefixed := NewPrefixedIdentifier(id, filepath.Join(mulitenantDir, bucket), "") - loaded, err := NewShippableTSDBFile(prefixed) + loaded, err := NewShippableTSDBFile(prefixed, IndexOpts{}) if err != nil { level.Warn(m.log).Log( @@ -221,7 +221,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, shipper indexshipper.Ind level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start)) - loaded, err := NewShippableTSDBFile(dst) + loaded, err := NewShippableTSDBFile(dst, IndexOpts{}) if err != nil { return err } diff --git a/pkg/storage/stores/tsdb/multi_file_index_test.go b/pkg/storage/stores/tsdb/multi_file_index_test.go index 0f6d5f306ec19..e82d158bd6dba 100644 --- a/pkg/storage/stores/tsdb/multi_file_index_test.go +++ b/pkg/storage/stores/tsdb/multi_file_index_test.go @@ -61,7 +61,7 @@ func TestMultiIndex(t *testing.T) { var indices []Index dir := t.TempDir() for i := 0; i < n; i++ { - indices = append(indices, BuildIndex(t, dir, cases)) + indices = append(indices, BuildIndex(t, dir, cases, IndexOpts{})) } idx := NewMultiIndex(IndexSlice(indices)) diff --git a/pkg/storage/stores/tsdb/postings_codec.go b/pkg/storage/stores/tsdb/postings_codec.go new file mode 100644 index 0000000000000..a6517352213ee --- /dev/null +++ b/pkg/storage/stores/tsdb/postings_codec.go @@ -0,0 +1,137 @@ +package tsdb + +import ( + "bytes" + + "github.com/golang/snappy" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + promEncoding "github.com/prometheus/prometheus/tsdb/encoding" + + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" + "github.com/grafana/loki/pkg/util/encoding" +) + +type codec string + +const ( + codecHeaderSnappy codec = "dvs" // As in "diff+varint+snappy". +) + +// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. +func isDiffVarintSnappyEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) +} + +func diffVarintSnappyDecode(input []byte) (index.Postings, error) { + if !isDiffVarintSnappyEncodedPostings(input) { + return nil, errors.New(string(codecHeaderSnappy) + " header not found") + } + + offset := len(codecHeaderSnappy) + + raw, err := snappy.Decode(nil, input[offset:]) + if err != nil { + return nil, errors.Wrap(err, "snappy decode") + } + + return newDiffVarintPostings(raw), nil +} + +func newDiffVarintPostings(input []byte) *diffVarintPostings { + return &diffVarintPostings{buf: &promEncoding.Decbuf{B: input}} +} + +// diffVarintPostings is an implementation of index.Postings based on diff+varint encoded data. +type diffVarintPostings struct { + buf *promEncoding.Decbuf + cur storage.SeriesRef +} + +func (it *diffVarintPostings) At() storage.SeriesRef { + return it.cur +} + +func (it *diffVarintPostings) Next() bool { + if it.buf.Err() != nil || it.buf.Len() == 0 { + return false + } + + val := it.buf.Uvarint64() + if it.buf.Err() != nil { + return false + } + + it.cur = it.cur + storage.SeriesRef(val) + return true +} + +func (it *diffVarintPostings) Seek(x storage.SeriesRef) bool { + if it.cur >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for it.Next() { + if it.At() >= x { + return true + } + } + + return false +} + +func (it *diffVarintPostings) Err() error { + return it.buf.Err() +} + +// diffVarintSnappyEncode encodes postings into diff+varint representation, +// and applies snappy compression on the result. +// Returned byte slice starts with codecHeaderSnappy header. +// Length argument is expected number of postings, used for preallocating buffer. +func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { + buf, err := diffVarintEncodeNoHeader(p, length) + if err != nil { + return nil, err + } + + // Make result buffer large enough to hold our header and compressed block. + result := make([]byte, len(codecHeaderSnappy)+snappy.MaxEncodedLen(len(buf))) + copy(result, codecHeaderSnappy) + + compressed := snappy.Encode(result[len(codecHeaderSnappy):], buf) + + // Slice result buffer based on compressed size. + result = result[:len(codecHeaderSnappy)+len(compressed)] + return result, nil +} + +// diffVarintEncodeNoHeader encodes postings into diff+varint representation. +// It doesn't add any header to the output bytes. +func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) { + buf := encoding.Encbuf{} + + // This encoding uses around ~1 bytes per posting, but let's use + // conservative 1.25 bytes per posting to avoid extra allocations. + if length > 0 { + buf.B = make([]byte, 0, 5*length/4) + } + + prev := storage.SeriesRef(0) + for p.Next() { + v := p.At() + if v < prev { + return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) + } + + // This is the 'diff' part -- compute difference from previous value. + buf.PutUvarint64(uint64(v - prev)) + prev = v + } + if p.Err() != nil { + return nil, p.Err() + } + + return buf.B, nil +} diff --git a/pkg/storage/stores/tsdb/postings_codec_test.go b/pkg/storage/stores/tsdb/postings_codec_test.go new file mode 100644 index 0000000000000..41f8c8efd107a --- /dev/null +++ b/pkg/storage/stores/tsdb/postings_codec_test.go @@ -0,0 +1,206 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/postings_codec_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Thanos Authors. +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/storepb/testutil/series.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Thanos Authors. + +package tsdb + +import ( + "os" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" +) + +const ( + // labelLongSuffix is a label with ~50B in size, to emulate real-world high cardinality. + labelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" +) + +func TestDiffVarintCodec(t *testing.T) { + chunksDir := t.TempDir() + + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = chunksDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, h.Close()) + assert.NoError(t, os.RemoveAll(chunksDir)) + }) + + idx, err := h.Index() + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, idx.Close()) + }) + + postingsMap := map[string]index.Postings{ + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+labelLongSuffix)), + `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), + `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), + `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), + `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), + `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), + `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), + `i!=""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+labelLongSuffix)), + `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), + } + + codecs := map[string]struct { + codingFunction func(index.Postings, int) ([]byte, error) + decodingFunction func([]byte) (index.Postings, error) + }{ + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { + return newDiffVarintPostings(bytes), nil + }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + } + + for postingName, postings := range postingsMap { + p, err := toUint64Postings(postings) + require.NoError(t, err) + + for cname, codec := range codecs { + name := cname + "/" + postingName + + t.Run(name, func(t *testing.T) { + p.reset() // We reuse postings between runs, so we need to reset iterator. + + data, err := codec.codingFunction(p, p.len()) + require.NoError(t, err) + + t.Log("encoded size", len(data), "bytes") + t.Logf("ratio: %0.3f", float64(len(data))/float64(4*p.len())) + + decodedPostings, err := codec.decodingFunction(data) + require.NoError(t, err) + + p.reset() + comparePostings(t, p, decodedPostings) + }) + } + } +} + +func TestLabelMatchersTypeValues(t *testing.T) { + expectedValues := map[labels.MatchType]int{ + labels.MatchEqual: 0, + labels.MatchNotEqual: 1, + labels.MatchRegexp: 2, + labels.MatchNotRegexp: 3, + } + + for matcherType, val := range expectedValues { + require.Equal(t, int(labels.MustNewMatcher(matcherType, "", "").Type), val, + "diffVarintSnappyWithMatchersEncode relies on the number values of hte matchers not changing. "+ + "It caches each matcher type as these integer values. "+ + "If the integer values change, then the already cached values in the index cache will be improperly decoded.") + } +} + +func comparePostings(t *testing.T, p1, p2 index.Postings) { + for p1.Next() { + require.True(t, p2.Next()) + require.Equal(t, p1.At(), p2.At()) + } + + if p2.Next() { + t.Fatal("p2 has more values") + return + } + + require.NoError(t, p1.Err()) + require.NoError(t, p2.Err()) +} + +func matchPostings(t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.Postings { + vals, err := ix.LabelValues(m.Name) + assert.NoError(t, err) + + matching := []string(nil) + for _, v := range vals { + if m.Matches(v) { + matching = append(matching, v) + } + } + + p, err := ix.Postings(m.Name, matching...) + assert.NoError(t, err) + return p +} + +func toUint64Postings(p index.Postings) (*uint64Postings, error) { + var vals []storage.SeriesRef + for p.Next() { + vals = append(vals, p.At()) + } + return &uint64Postings{vals: vals, ix: -1}, p.Err() +} + +// Postings with no decoding step. +type uint64Postings struct { + vals []storage.SeriesRef + ix int +} + +func (p *uint64Postings) At() storage.SeriesRef { + if p.ix < 0 || p.ix >= len(p.vals) { + return 0 + } + return p.vals[p.ix] +} + +func (p *uint64Postings) Next() bool { + if p.ix < len(p.vals)-1 { + p.ix++ + return true + } + return false +} + +func (p *uint64Postings) Seek(x storage.SeriesRef) bool { + if p.At() >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for p.Next() { + if p.At() >= x { + return true + } + } + + return false +} + +func (p *uint64Postings) Err() error { + return nil +} + +func (p *uint64Postings) reset() { + p.ix = -1 +} + +func (p *uint64Postings) len() int { + return len(p.vals) +} + +func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { + k, v := index.AllPostingsKey() + p, err := ix.Postings(k, v) + assert.NoError(t, err) + return p +} diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index 2eeb923e3033c..9cbd03d9ec8f0 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "errors" + "fmt" "io" "math" "path/filepath" @@ -15,6 +16,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" @@ -27,17 +29,22 @@ var ErrAlreadyOnDesiredVersion = errors.New("tsdb file already on desired versio // GetRawFileReaderFunc returns an io.ReadSeeker for reading raw tsdb file from disk type GetRawFileReaderFunc func() (io.ReadSeeker, error) -func OpenShippableTSDB(p string) (index_shipper.Index, error) { +type IndexOpts struct { + PostingsCache cache.Cache +} + +func OpenShippableTSDB(p string, opts IndexOpts) (index_shipper.Index, error) { id, err := identifierFromPath(p) if err != nil { return nil, err } - return NewShippableTSDBFile(id) + return NewShippableTSDBFile(id, opts) } func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index_shipper.Index, error) { - indexFile, err := OpenShippableTSDB(path) + opts := IndexOpts{} + indexFile, err := OpenShippableTSDB(path, opts) if err != nil { return nil, err } @@ -76,7 +83,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index if err != nil { return nil, err } - return NewShippableTSDBFile(id) + return NewShippableTSDBFile(id, IndexOpts{}) } // nolint @@ -92,8 +99,8 @@ type TSDBFile struct { getRawFileReader GetRawFileReaderFunc } -func NewShippableTSDBFile(id Identifier) (*TSDBFile, error) { - idx, getRawFileReader, err := NewTSDBIndexFromFile(id.Path()) +func NewShippableTSDBFile(id Identifier, opts IndexOpts) (*TSDBFile, error) { + idx, getRawFileReader, err := NewTSDBIndexFromFile(id.Path(), opts) if err != nil { return nil, err } @@ -118,26 +125,54 @@ func (f *TSDBFile) Reader() (io.ReadSeeker, error) { // and translates the IndexReader to an Index implementation // It loads the file into memory and doesn't keep a file descriptor open type TSDBIndex struct { - reader IndexReader - chunkFilter chunk.RequestChunkFilterer + reader IndexReader + chunkFilter chunk.RequestChunkFilterer + postingsReader PostingsReader } // Return the index as well as the underlying raw file reader which isn't exposed as an index // method but is helpful for building an io.reader for the index shipper -func NewTSDBIndexFromFile(location string) (*TSDBIndex, GetRawFileReaderFunc, error) { +func NewTSDBIndexFromFile(location string, opts IndexOpts) (Index, GetRawFileReaderFunc, error) { reader, err := index.NewFileReader(location) if err != nil { return nil, nil, err } - return NewTSDBIndex(reader), func() (io.ReadSeeker, error) { + postingsReader := getPostingsReader(reader, opts.PostingsCache) + tsdbIdx := NewTSDBIndex(reader, postingsReader) + + return tsdbIdx, func() (io.ReadSeeker, error) { return reader.RawFileReader() }, nil } -func NewTSDBIndex(reader IndexReader) *TSDBIndex { +func getPostingsReader(reader IndexReader, postingsCache cache.Cache) PostingsReader { + if postingsCache != nil { + return NewCachedPostingsReader(reader, util_log.Logger, postingsCache) + } + return NewPostingsReader(reader) +} + +func NewPostingsReader(reader IndexReader) PostingsReader { + return &defaultPostingsReader{reader: reader} +} + +type defaultPostingsReader struct { + reader IndexReader +} + +func (s *defaultPostingsReader) ForPostings(_ context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error { + p, err := PostingsForMatchers(s.reader, nil, matchers...) + if err != nil { + return err + } + return fn(p) +} + +func NewTSDBIndex(reader IndexReader, postingsReader PostingsReader) *TSDBIndex { return &TSDBIndex{ - reader: reader, + reader: reader, + postingsReader: postingsReader, } } @@ -168,7 +203,7 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, filterer = i.chunkFilter.ForRequest(ctx) } - return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { + return i.postingsReader.ForPostings(ctx, matchers, func(p index.Postings) error { for p.Next() { hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks) if err != nil { @@ -191,20 +226,6 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, } -func (i *TSDBIndex) forPostings( - _ context.Context, - shard *index.ShardAnnotation, - _, _ model.Time, - matchers []*labels.Matcher, - fn func(index.Postings) error, -) error { - p, err := PostingsForMatchers(i.reader, shard, matchers...) - if err != nil { - return err - } - return fn(p) -} - func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { if res == nil { res = ChunkRefsPool.Get() @@ -280,7 +301,7 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier { } func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, _ shouldIncludeChunk, matchers ...*labels.Matcher) error { - return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { + return i.postingsReader.ForPostings(ctx, matchers, func(p index.Postings) error { // TODO(owen-d): use pool var ls labels.Labels var filterer chunk.Filterer @@ -289,9 +310,10 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim } for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + seriesRef := p.At() + fp, stats, err := i.reader.ChunkStats(seriesRef, int64(from), int64(through), &ls) if err != nil { - return err + return fmt.Errorf("stats: chunk stats: %w, seriesRef: %d", err, seriesRef) } // skip series that belong to different shards @@ -354,7 +376,7 @@ func (i *TSDBIndex) Volume( aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" - return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error { + return i.postingsReader.ForPostings(ctx, matchers, func(p index.Postings) error { var ls labels.Labels var filterer chunk.Filterer if i.chunkFilter != nil { @@ -364,7 +386,7 @@ func (i *TSDBIndex) Volume( for p.Next() { fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) if err != nil { - return err + return fmt.Errorf("series volume: %w", err) } // skip series that belong to different shards diff --git a/pkg/storage/stores/tsdb/single_file_index_test.go b/pkg/storage/stores/tsdb/single_file_index_test.go index 992e5109b84d2..d1952b0b055f5 100644 --- a/pkg/storage/stores/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/tsdb/single_file_index_test.go @@ -71,7 +71,7 @@ func TestSingleIdx(t *testing.T) { { desc: "file", fn: func() Index { - return BuildIndex(t, t.TempDir(), cases) + return BuildIndex(t, t.TempDir(), cases, IndexOpts{}) }, }, { @@ -82,7 +82,7 @@ func TestSingleIdx(t *testing.T) { _, _ = head.Append(x.Labels, x.Labels.Hash(), x.Chunks) } reader := head.Index() - return NewTSDBIndex(reader) + return NewTSDBIndex(reader, NewPostingsReader(reader)) }, }, } { @@ -214,7 +214,6 @@ func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond) queryBounds := newBounds(queryFrom, queryThrough) numChunksToMatch := 0 - var chunkMetas []index.ChunkMeta // build a chunk for every second with randomized chunk length for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) { @@ -249,14 +248,14 @@ func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`), Chunks: chunkMetas, }, - }) + }, IndexOpts{}) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - require.NoError(b, err) - require.Len(b, chkRefs, numChunksToMatch*2) + chkRefs := ChunkRefsPool.Get() + chkRefs, _ = tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, chkRefs, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ChunkRefsPool.Put(chkRefs) } } @@ -304,7 +303,7 @@ func TestTSDBIndex_Stats(t *testing.T) { // Create the TSDB index tempDir := t.TempDir() - tsdbIndex := BuildIndex(t, tempDir, series) + tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{}) // Create the test cases testCases := []struct { @@ -438,7 +437,7 @@ func TestTSDBIndex_Volume(t *testing.T) { // Create the TSDB index tempDir := t.TempDir() - tsdbIndex := BuildIndex(t, tempDir, series) + tsdbIndex := BuildIndex(t, tempDir, series, IndexOpts{}) from := model.TimeFromUnixNano(t1.UnixNano()) through := model.TimeFromUnixNano(t2.UnixNano()) diff --git a/pkg/storage/stores/tsdb/store.go b/pkg/storage/stores/tsdb/store.go index ed662840c2926..a70c047ca77b4 100644 --- a/pkg/storage/stores/tsdb/store.go +++ b/pkg/storage/stores/tsdb/store.go @@ -14,12 +14,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads" + indexshipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" tsdb_index "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -39,7 +41,7 @@ type store struct { // NewStore creates a new tsdb index ReaderWriter. func NewStore( name string, - indexShipperCfg indexshipper.Config, + indexShipperCfg IndexCfg, schemaCfg config.SchemaConfig, _ *fetcher.Fetcher, objectClient client.ObjectClient, @@ -48,6 +50,7 @@ func NewStore( backupIndexWriter index.Writer, reg prometheus.Registerer, logger log.Logger, + idxCache cache.Cache, ) ( index.ReaderWriter, func(), @@ -62,27 +65,37 @@ func NewStore( logger: logger, } - if err := storeInstance.init(name, indexShipperCfg, schemaCfg, objectClient, limits, tableRange, reg); err != nil { + if err := storeInstance.init(name, indexShipperCfg, schemaCfg, objectClient, limits, tableRange, reg, idxCache); err != nil { return nil, nil, err } return storeInstance, storeInstance.Stop, nil } -func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg config.SchemaConfig, objectClient client.ObjectClient, - limits downloads.Limits, tableRange config.TableRange, reg prometheus.Registerer) error { +func (s *store) init(name string, indexCfg IndexCfg, schemaCfg config.SchemaConfig, objectClient client.ObjectClient, + limits downloads.Limits, tableRange config.TableRange, reg prometheus.Registerer, idxCache cache.Cache) error { + + var sharedCache cache.Cache + if indexCfg.CachePostings && indexCfg.Mode == indexshipper.ModeReadOnly && idxCache != nil { + sharedCache = idxCache + } + + openFn := func(p string) (indexshipper_index.Index, error) { + return OpenShippableTSDB(p, IndexOpts{PostingsCache: sharedCache}) + } var err error s.indexShipper, err = indexshipper.NewIndexShipper( - indexShipperCfg, + indexCfg.Config, objectClient, limits, nil, - OpenShippableTSDB, + openFn, tableRange, prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", reg), s.logger, ) + if err != nil { return err } @@ -90,7 +103,7 @@ func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg var indices []Index opts := DefaultIndexClientOptions() - if indexShipperCfg.Mode == indexshipper.ModeWriteOnly { + if indexCfg.Mode == indexshipper.ModeWriteOnly { // We disable bloom filters on write nodes // for the Stats() methods as it's of relatively little // benefit when compared to the memory cost. The bloom filters @@ -100,8 +113,8 @@ func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg opts.UseBloomFilters = false } - if indexShipperCfg.Mode != indexshipper.ModeReadOnly { - nodeName, err := indexShipperCfg.GetUniqueUploaderName() + if indexCfg.Mode != indexshipper.ModeReadOnly { + nodeName, err := indexCfg.GetUniqueUploaderName() if err != nil { return err } @@ -110,7 +123,7 @@ func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg tsdbManager := NewTSDBManager( name, nodeName, - indexShipperCfg.ActiveIndexDirectory, + indexCfg.ActiveIndexDirectory, s.indexShipper, tableRange, schemaCfg, @@ -121,7 +134,7 @@ func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg headManager := NewHeadManager( name, s.logger, - indexShipperCfg.ActiveIndexDirectory, + indexCfg.ActiveIndexDirectory, tsdbMetrics, tsdbManager, ) diff --git a/pkg/storage/stores/tsdb/util_test.go b/pkg/storage/stores/tsdb/util_test.go index 2931e59d8812a..8fa8ee3588942 100644 --- a/pkg/storage/stores/tsdb/util_test.go +++ b/pkg/storage/stores/tsdb/util_test.go @@ -17,7 +17,7 @@ type LoadableSeries struct { Chunks index.ChunkMetas } -func BuildIndex(t testing.TB, dir string, cases []LoadableSeries) *TSDBFile { +func BuildIndex(t testing.TB, dir string, cases []LoadableSeries, opts IndexOpts) *TSDBFile { b := NewBuilder(index.LiveFormat) for _, s := range cases { @@ -35,7 +35,7 @@ func BuildIndex(t testing.TB, dir string, cases []LoadableSeries) *TSDBFile { }) require.Nil(t, err) - idx, err := NewShippableTSDBFile(dst) + idx, err := NewShippableTSDBFile(dst, opts) require.Nil(t, err) return idx } diff --git a/tools/tsdb/index-analyzer/main.go b/tools/tsdb/index-analyzer/main.go index 58f650703d046..b59d1ea22a4b5 100644 --- a/tools/tsdb/index-analyzer/main.go +++ b/tools/tsdb/index-analyzer/main.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" + indexshipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/tsdb" "github.com/grafana/loki/pkg/util/cfg" util_log "github.com/grafana/loki/pkg/util/log" @@ -33,12 +34,16 @@ func main() { tableRanges := getIndexStoreTableRanges(config.TSDBType, conf.SchemaConfig.Configs) + openFn := func(p string) (indexshipper_index.Index, error) { + return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{}) + } + shipper, err := indexshipper.NewIndexShipper( - conf.StorageConfig.TSDBShipperConfig, + conf.StorageConfig.TSDBShipperConfig.Config, objectClient, overrides, nil, - tsdb.OpenShippableTSDB, + openFn, tableRanges[len(tableRanges)-1], prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", prometheus.DefaultRegisterer), util_log.Logger, diff --git a/tools/tsdb/migrate-versions/main_test.go b/tools/tsdb/migrate-versions/main_test.go index 92daabb96043d..5fa80cae526b3 100644 --- a/tools/tsdb/migrate-versions/main_test.go +++ b/tools/tsdb/migrate-versions/main_test.go @@ -88,7 +88,7 @@ func TestMigrateTables(t *testing.T) { require.NoError(t, err) tableName := fmt.Sprintf("%s%d", indexPrefix, i) - idx, err := tsdb.NewShippableTSDBFile(id) + idx, err := tsdb.NewShippableTSDBFile(id, tsdb.IndexOpts{}) require.NoError(t, err) require.NoError(t, uploadFile(idx, indexStorageClient, tableName, userID)) diff --git a/tools/tsdb/tsdb-map/main_test.go b/tools/tsdb/tsdb-map/main_test.go index 5d5b9bdaa118d..4d49085290d32 100644 --- a/tools/tsdb/tsdb-map/main_test.go +++ b/tools/tsdb/tsdb-map/main_test.go @@ -93,7 +93,7 @@ func BenchmarkQuery_GetChunkRefs(b *testing.B) { if err != nil { panic(err) } - idx := tsdb.NewTSDBIndex(reader) + idx := tsdb.NewTSDBIndex(reader, tsdb.NewPostingsReader(reader)) b.Run(bm.name, func(b *testing.B) { refs := tsdb.ChunkRefsPool.Get() for i := 0; i < b.N; i++ { @@ -118,7 +118,7 @@ func BenchmarkQuery_GetChunkRefsSharded(b *testing.B) { if err != nil { panic(err) } - idx := tsdb.NewTSDBIndex(reader) + idx := tsdb.NewTSDBIndex(reader, tsdb.NewPostingsReader(reader)) shardFactor := 16 b.Run(bm.name, func(b *testing.B) {