Skip to content

Commit

Permalink
chore: Avoid looking up tsdb symbols during Volume API
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Aug 26, 2024
1 parent 246a1df commit 65a5301
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 39 deletions.
28 changes: 16 additions & 12 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func (t *testFilter) ShouldFilter(lbs labels.Labels) bool {
return lbs.Get("log_stream") == "dispatcher"
}

func (t *testFilter) RequiredLabelNames() []string {
return []string{"log_stream"}
}

func Test_ChunkFilter(t *testing.T) {
instance := defaultInstance(t)
instance.chunkFilter = &testFilter{}
Expand Down Expand Up @@ -1173,7 +1177,7 @@ func TestInstance_Volume(t *testing.T) {

volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1192,7 +1196,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand All @@ -1208,7 +1212,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Series,
Expand Down Expand Up @@ -1242,7 +1246,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"log_stream"},
Expand All @@ -1260,7 +1264,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1277,7 +1281,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand All @@ -1297,7 +1301,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1317,7 +1321,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="worker"}`,
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand All @@ -1338,7 +1342,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: "{}",
Limit: 5,
AggregateBy: seriesvolume.Labels,
Expand Down Expand Up @@ -1373,7 +1377,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1390,7 +1394,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 5,
TargetLabels: []string{"host"},
Expand All @@ -1407,7 +1411,7 @@ func TestInstance_Volume(t *testing.T) {
instance := prepareInstance(t)
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 1.1 * 1e3, //milliseconds
Through: 1.1 * 1e3, // milliseconds
Matchers: `{log_stream=~".+"}`,
Limit: 5,
TargetLabels: []string{"host", "job"},
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ type RequestChunkFilterer interface {
// Filterer filters chunks based on the metric.
type Filterer interface {
ShouldFilter(metric labels.Labels) bool
RequiredLabelNames() []string
}
34 changes: 22 additions & 12 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ func getLocalStore(path string, cm ClientMetrics) Store {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
}},
},
},
RowShards: 16,
},
},
Expand Down Expand Up @@ -866,6 +867,10 @@ func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool {
return metric.Get("foo") == "bazz"
}

func (f fakeChunkFilterer) RequiredLabelNames() []string {
return []string{"foo"}
}

func Test_ChunkFilterer(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
Expand Down Expand Up @@ -921,7 +926,6 @@ func Test_PipelineWrapper(t *testing.T) {
s.SetPipelineWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -952,7 +956,6 @@ func Test_PipelineWrapper_disabled(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand Down Expand Up @@ -1292,7 +1295,8 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
}

schemaConfig := config.SchemaConfig{
Expand Down Expand Up @@ -1366,7 +1370,8 @@ func TestStore_indexPrefixChange(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_tsdb_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
}
schemaConfig.Configs = append(schemaConfig.Configs, periodConfig2)
Expand Down Expand Up @@ -1471,7 +1476,8 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
}

periodConfigV11 := config.PeriodConfig{
Expand All @@ -1483,7 +1489,8 @@ func TestStore_MultiPeriod(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
}

Expand Down Expand Up @@ -1562,7 +1569,6 @@ func TestStore_MultiPeriod(t *testing.T) {
}
})
}

}

func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry {
Expand Down Expand Up @@ -1829,7 +1835,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
},
{
Expand All @@ -1842,7 +1849,8 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
},
},
}
Expand Down Expand Up @@ -1980,7 +1988,8 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
RowShards: 2,
},
{
Expand All @@ -1993,7 +2002,8 @@ func TestStore_SyncStopInteraction(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
},
},
},
},
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,23 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6
return s.fp, nil
}

func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) {
func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error) {
s := h.head.series.getByID(uint64(ref))

if s == nil {
h.head.metrics.seriesNotFound.Inc()
return 0, index.ChunkStats{}, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.ls...)
if len(by) == 0 {
*lbls = append((*lbls)[:0], s.ls...)
} else {
*lbls = (*lbls)[:0]
for _, l := range s.ls {
if _, ok := by[l.Name]; ok {
*lbls = append(*lbls, l)
}
}
}

queryBounds := newBounds(model.Time(from), model.Time(through))

Expand Down
50 changes: 46 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,7 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l
return fprint, nil
}

func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
offset := id
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
Expand All @@ -1844,7 +1844,7 @@ func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *lab
return 0, ChunkStats{}, d.Err()
}

return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls, by)
}

func (r *Reader) Postings(name string, fpFilter FingerprintFilter, values ...string) (Postings, error) {
Expand Down Expand Up @@ -2216,11 +2216,53 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return &d, fprint, nil
}

// prepSeriesBy returns series labels and chunks for a series and only returning selected `by` label names.
// If `by` is empty, it returns all labels for the series.
func (dec *Decoder) prepSeriesBy(b []byte, lbls *labels.Labels, chks *[]ChunkMeta, by map[string]struct{}) (*encoding.Decbuf, uint64, error) {
if len(by) == 0 {
return dec.prepSeries(b, lbls, chks)
}
*lbls = (*lbls)[:0]
if chks != nil {
*chks = (*chks)[:0]
}

d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})

fprint := d.Be64()
k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
// todo(cyriltovena): we could cache this by user requests spanning multiple prepSeries calls.
ln, err := dec.LookupSymbol(lno)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label name")
}
if _, ok := by[ln]; !ok {
continue
}

lv, err := dec.LookupSymbol(lvo)
if err != nil {
return nil, 0, errors.Wrap(err, "lookup label value")
Expand All @@ -2231,8 +2273,8 @@ func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta)
return &d, fprint, nil
}

func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeries(b, lbls, nil)
func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels, by map[string]struct{}) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeriesBy(b, lbls, nil, by)
if err != nil {
return 0, ChunkStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type IndexReader interface {
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

// ChunkStats returns the stats for the chunks in the given series.
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error)
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels, by map[string]struct{}) (uint64, index.ChunkStats, error)

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
Expand Down
Loading

0 comments on commit 65a5301

Please sign in to comment.