From 65a5301280fd174e9902d0ee795a16087d0644ce Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 26 Aug 2024 17:11:33 +0200 Subject: [PATCH] chore: Avoid looking up tsdb symbols during Volume API --- pkg/ingester/instance_test.go | 28 ++++++----- pkg/storage/chunk/interface.go | 1 + pkg/storage/store_test.go | 34 ++++++++----- .../shipper/indexshipper/tsdb/head_read.go | 13 ++++- .../shipper/indexshipper/tsdb/index/index.go | 50 +++++++++++++++++-- .../shipper/indexshipper/tsdb/querier.go | 2 +- .../indexshipper/tsdb/single_file_index.go | 25 ++++++++-- .../tsdb/single_file_index_test.go | 48 ++++++++++++++++-- 8 files changed, 162 insertions(+), 39 deletions(-) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 6867b0e1519c2..3e2a822d3eace 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool { return lbs.Get("log_stream") == "dispatcher" } +func (t *testFilter) RequiredLabelNames() []string { + return []string{"log_stream"} +} + func Test_ChunkFilter(t *testing.T) { instance := defaultInstance(t) instance.chunkFilter = &testFilter{} @@ -1173,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) { volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1192,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, AggregateBy: seriesvolume.Series, @@ -1208,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Series, @@ -1242,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"log_stream"}, @@ -1260,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1277,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, @@ -1297,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1317,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="worker"}`, Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1338,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 5, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: "{}", Limit: 5, AggregateBy: seriesvolume.Labels, @@ -1373,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1390,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream="dispatcher"}`, Limit: 5, TargetLabels: []string{"host"}, @@ -1407,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, - Through: 1.1 * 1e3, //milliseconds + Through: 1.1 * 1e3, // milliseconds Matchers: `{log_stream=~".+"}`, Limit: 5, TargetLabels: []string{"host", "job"}, diff --git a/pkg/storage/chunk/interface.go b/pkg/storage/chunk/interface.go index 8da4312c60398..1fbb2beb52073 100644 --- a/pkg/storage/chunk/interface.go +++ b/pkg/storage/chunk/interface.go @@ -67,4 +67,5 @@ type RequestChunkFilterer interface { // Filterer filters chunks based on the metric. type Filterer interface { ShouldFilter(metric labels.Labels) bool + RequiredLabelNames() []string } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 3c9acdfa5a638..d31501893dc45 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -219,7 +219,8 @@ func getLocalStore(path string, cm ClientMetrics) Store { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 168, - }}, + }, + }, RowShards: 16, }, }, @@ -866,6 +867,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool { return metric.Get("foo") == "bazz" } +func (f fakeChunkFilterer) RequiredLabelNames() []string { + return []string{"foo"} +} + func Test_ChunkFilterer(t *testing.T) { s := &LokiStore{ Store: storeFixture, @@ -921,7 +926,6 @@ func Test_PipelineWrapper(t *testing.T) { s.SetPipelineWrapper(wrapper) ctx = user.InjectOrgID(context.Background(), "test-user") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) - if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -952,7 +956,6 @@ func Test_PipelineWrapper_disabled(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "test-user") ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) - if err != nil { t.Errorf("store.SelectLogs() error = %v", err) return @@ -1292,7 +1295,8 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, } schemaConfig := config.SchemaConfig{ @@ -1366,7 +1370,8 @@ func TestStore_indexPrefixChange(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_tsdb_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, } schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2) @@ -1471,7 +1476,8 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, } periodConfigV11 := config.PeriodConfig{ @@ -1483,7 +1489,8 @@ func TestStore_MultiPeriod(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, } @@ -1562,7 +1569,6 @@ func TestStore_MultiPeriod(t *testing.T) { } }) } - } func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry { @@ -1829,7 +1835,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, }, { @@ -1842,7 +1849,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, }, }, } @@ -1980,7 +1988,8 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, RowShards: 2, }, { @@ -1993,7 +2002,8 @@ func TestStore_SyncStopInteraction(t *testing.T) { PeriodicTableConfig: config.PeriodicTableConfig{ Prefix: "index_", Period: time.Hour * 24, - }}, + }, + }, }, }, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go index 3a0cf3cdbfc7d..cf709e7bd97c0 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6 return s.fp, nil } -func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) { +func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) { s := h.head.series.getByID(uint64(ref)) if s == nil { h.head.metrics.seriesNotFound.Inc() return 0, index.ChunkStats{}, storage.ErrNotFound } - *lbls = append((*lbls)[:0], s.ls...) + if len(by) == 0 { + *lbls = append((*lbls)[:0], s.ls...) + } else { + *lbls = (*lbls)[:0] + for _, l := range s.ls { + if _, ok := by[l.Name]; ok { + *lbls = append(*lbls, l) + } + } + } queryBounds := newBounds(model.Time(from), model.Time(through)) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go index 123750aea3de4..60a8d5f1d62cf 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l return fprint, nil } -func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) { +func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) { offset := id // In version 2+ series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab return 0, ChunkStats{}, d.Err() } - return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls) + return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by) } func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) { @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) if d.Err() != nil { return nil, 0, errors.Wrap(d.Err(), "read series label offsets") } + // todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls. + ln, err := dec.LookupSymbol(lno) + if err != nil { + return nil, 0, errors.Wrap(err, "lookup label name") + } + lv, err := dec.LookupSymbol(lvo) + if err != nil { + return nil, 0, errors.Wrap(err, "lookup label value") + } + + *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) + } + return &d, fprint, nil +} + +// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names. +// If `by` is empty, it returns all labels for the series. +func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) { + if len(by) == 0 { + return dec.prepSeries(b, lbls, chks) + } + *lbls = (*lbls)[:0] + if chks != nil { + *chks = (*chks)[:0] + } + d := encoding.DecWrap(tsdb_enc.Decbuf{B: b}) + + fprint := d.Be64() + k := d.Uvarint() + + for i := 0; i < k; i++ { + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) + + if d.Err() != nil { + return nil, 0, errors.Wrap(d.Err(), "read series label offsets") + } + // todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls. ln, err := dec.LookupSymbol(lno) if err != nil { return nil, 0, errors.Wrap(err, "lookup label name") } + if _, ok := by[ln]; !ok { + continue + } + lv, err := dec.LookupSymbol(lvo) if err != nil { return nil, 0, errors.Wrap(err, "lookup label value") @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) return &d, fprint, nil } -func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) { - d, fp, err := dec.prepSeries(b, lbls, nil) +func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) { + d, fp, err := dec.prepSeriesBy(b, lbls, nil, by) if err != nil { return 0, ChunkStats{}, err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go index b29556c348cf6..60ec32ee954b0 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/querier.go @@ -69,7 +69,7 @@ type IndexReader interface { Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) // ChunkStats returns the stats for the chunks in the given series. - ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error) + ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(matchers ...*labels.Matcher) ([]string, error) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go index 7934b952ba88f..c523381534fb2 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index.go @@ -74,7 +74,6 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (shipp } return NewPrefixedIdentifier(id, parentDir, "") }) - if err != nil { return nil, err } @@ -195,7 +194,6 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, _ string, fpFilter index.Fing } return p.Err() }) - } func (i *TSDBIndex) forPostings( @@ -220,7 +218,6 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu if err := i.ForSeries(ctx, "", fpFilter, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { for _, chk := range chks { - res = append(res, ChunkRef{ User: userID, // assumed to be the same, will be enforced by caller. Fingerprint: fp, @@ -298,7 +295,7 @@ func (i *TSDBIndex) Stats(ctx context.Context, _ string, from, through model.Tim } for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, nil) if err != nil { return err } @@ -362,6 +359,24 @@ func (i *TSDBIndex) Volume( seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch))) aggregateBySeries := seriesvolume.AggregateBySeries(aggregateBy) || aggregateBy == "" + var by map[string]struct{} + if !includeAll && (aggregateBySeries || len(targetLabels) > 0) { + by = make(map[string]struct{}, len(labelsToMatch)) + for k := range labelsToMatch { + by[k] = struct{}{} + } + if len(labelsToMatch) > 0 { + for k := range labelsToMatch { + by[k] = struct{}{} + } + } + // If we are aggregating by series, we need to include all labels in the series required for filtering chunks. + if i.chunkFilter != nil { + for _, k := range i.chunkFilter.ForRequest(ctx).RequiredLabelNames() { + by[k] = struct{}{} + } + } + } return i.forPostings(ctx, fpFilter, from, through, matchers, func(p index.Postings) error { var ls labels.Labels @@ -371,7 +386,7 @@ func (i *TSDBIndex) Volume( } for p.Next() { - fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls) + fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls, by) if err != nil { return fmt.Errorf("series volume: %w", err) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go index 068630c553a04..9784475091bf8 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/single_file_index_test.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "fmt" "math/rand" "sort" "testing" @@ -140,7 +141,6 @@ func TestSingleIdx(t *testing.T) { End: 10, Checksum: 3, }}, shardedRefs) - }) t.Run("Series", func(t *testing.T) { @@ -202,10 +202,8 @@ func TestSingleIdx(t *testing.T) { require.Nil(t, err) require.Equal(t, []string{"bar"}, vs) }) - }) } - } func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { @@ -743,10 +741,50 @@ func TestTSDBIndex_Volume(t *testing.T) { Limit: 10, }, acc.Volumes()) }) + // todo(cyriltovena): tests with chunk filterer }) }) } +func BenchmarkTSDBIndex_Volume(b *testing.B) { + var series []LoadableSeries + for i := 0; i < 1000; i++ { + series = append(series, LoadableSeries{ + Labels: mustParseLabels(fmt.Sprintf(`{foo="bar", fizz="fizz%d", buzz="buzz%d",bar="bar%d", bozz="bozz%d"}`, i, i, i, i)), + Chunks: []index.ChunkMeta{ + { + MinTime: 0, + MaxTime: 10, + Checksum: uint32(i), + KB: 10, + Entries: 10, + }, + { + MinTime: 10, + MaxTime: 20, + Checksum: uint32(i), + KB: 10, + Entries: 10, + }, + }, + }) + } + ctx := context.Background() + from := model.Earliest + through := model.Latest + // Create the TSDB index + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, series) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + acc := seriesvolume.NewAccumulator(10, 10) + err := tsdbIndex.Volume(ctx, "fake", from, through, acc, nil, nil, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+")) + require.NoError(b, err) + } +} + type filterAll struct{} func (f *filterAll) ForRequest(_ context.Context) chunk.Filterer { @@ -758,3 +796,7 @@ type filterAllFilterer struct{} func (f *filterAllFilterer) ShouldFilter(_ labels.Labels) bool { return true } + +func (f *filterAllFilterer) RequiredLabelNames() []string { + return nil +}