Skip to content

Commit

Permalink
fix(blooms): Exclude label filters where label name is part of the se…
Browse files Browse the repository at this point in the history
…ries labels. (#14661)
  • Loading branch information
salvacorts authored Oct 31, 2024
1 parent ebbbccf commit d1668f6
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 111 deletions.
19 changes: 18 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -41,7 +42,15 @@ func stringSlice[T fmt.Stringer](s []T) []string {

func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
return groupChunkRefs(chunkRefs, nil)
grouped := groupChunkRefs(nil, chunkRefs, nil)
// Put fake labels to the series
for _, g := range grouped {
g.Labels = &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", g.Fingerprint))),
}
}

return grouped
}

func newLimits() *validation.Overrides {
Expand Down Expand Up @@ -295,12 +304,18 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
{Fingerprint: 1000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696248000000, Through: 1696251600000, Checksum: 2},
{From: 1696244400000, Through: 1696248000000, Checksum: 4},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1000")),
}},
{Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696255200000, Through: 1696258800000, Checksum: 3},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2000")),
}},
{Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696240800000, Through: 1696244400000, Checksum: 1},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3000")),
}},
},
}, res)
Expand Down Expand Up @@ -405,6 +420,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
// see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go
rnd := rand.Intn(len(inputChunkRefs))
fp := inputChunkRefs[rnd].Fingerprint
lbs := inputChunkRefs[rnd].Labels
chks := inputChunkRefs[rnd].Refs
key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0)

Expand All @@ -428,6 +444,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: fp,
Labels: lbs,
Refs: chks,
Tenant: tenantID,
},
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Labels: chunkRef.Labels,
Tenant: chunkRef.Tenant,
Refs: refs,
})
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
}
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Labels: a.Labels,
Tenant: a.Tenant,
Refs: mergeChunkSets(a.Refs, b.Refs),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (it *requestIterator) Next() bool {
it.curr = v1.Request{
Recorder: it.recorder,
Fp: model.Fingerprint(group.Fingerprint),
Labels: logproto.FromLabelAdaptersToLabels(group.Labels.Labels),
Chks: convertToChunkRefs(group.Refs),
Search: it.search,
Response: it.channel,
Expand Down
9 changes: 9 additions & 0 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

v2 "github.com/grafana/loki/v3/pkg/iter/v2"
Expand Down Expand Up @@ -73,6 +74,8 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
}},
},
}
Expand All @@ -83,9 +86,13 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
}},
},
}
Expand All @@ -96,6 +103,8 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
}},
},
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/plan"
Expand Down Expand Up @@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
}

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
// Shortcut that does not require any filtering
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
return chunkRefs, nil
Expand All @@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from

grouped := groupedChunksRefPool.Get(len(chunkRefs))
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)
grouped = groupChunkRefs(series, chunkRefs, grouped)

preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)
Expand Down Expand Up @@ -225,7 +226,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// groups them by fingerprint.
// The second argument `grouped` can be used to pass a buffer to avoid allocations.
// If it's nil, the returned slice will be allocated.
func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
func groupChunkRefs(series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
seen := make(map[uint64]int, len(grouped))
for _, chunkRef := range chunkRefs {
if idx, found := seen[chunkRef.Fingerprint]; found {
Expand All @@ -234,10 +235,14 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC
seen[chunkRef.Fingerprint] = len(grouped)
grouped = append(grouped, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[chunkRef.Fingerprint]),
},
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
})
}
}

return grouped
}
54 changes: 38 additions & 16 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
Expand All @@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) {
chunkRefs := []*logproto.ChunkRef{}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
Expand All @@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.Error(t, err)
require.Nil(t, res)
})
Expand All @@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 2, c.callCount)
Expand All @@ -143,28 +145,44 @@ func TestBloomQuerier(t *testing.T) {
}

func TestGroupChunkRefs(t *testing.T) {
series := []labels.Labels{
labels.FromStrings("app", "1"),
labels.FromStrings("app", "2"),
labels.FromStrings("app", "3"),
}
seriesMap := make(map[uint64]labels.Labels)
for _, s := range series {
seriesMap[s.Hash()] = s
}

chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}

result := groupChunkRefs(chunkRefs, nil)
result := groupChunkRefs(seriesMap, chunkRefs, nil)
require.Equal(t, []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[0]),
}},
{Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[1]),
}},
{Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[2]),
}},
}, result)
}
Expand All @@ -175,11 +193,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
n := 1000 // num series
m := 10000 // num chunks per series
chunkRefs := make([]*logproto.ChunkRef, 0, n*m)
series := make(map[uint64]labels.Labels, n)

for i := 0; i < n; i++ {
s := labels.FromStrings("app", fmt.Sprintf("%d", i))
sFP := s.Hash()
series[sFP] = s
for j := 0; j < m; j++ {
chunkRefs = append(chunkRefs, &logproto.ChunkRef{
Fingerprint: uint64(n),
Fingerprint: sFP,
UserID: "tenant",
From: mktime("2024-04-20 00:00"),
Through: mktime("2024-04-20 00:59"),
Expand All @@ -196,5 +218,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
b.StartTimer()

groups := make([]*logproto.GroupedChunkRefs, 0, n)
groupChunkRefs(chunkRefs, groups)
groupChunkRefs(series, chunkRefs, groups)
}
1 change: 1 addition & 0 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto

res = append(res, &logproto.GroupedChunkRefs{
Fingerprint: series.Fingerprint,
Labels: series.Labels,
Tenant: series.Tenant,
Refs: relevantChunks,
})
Expand Down
8 changes: 6 additions & 2 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type IndexClientWithRange struct {
}

type BloomQuerier interface {
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
}

type Gateway struct {
Expand Down Expand Up @@ -225,12 +225,16 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return nil, err
}

series := make(map[uint64]labels.Labels)
result = &logproto.GetChunkRefResponse{
Refs: make([]*logproto.ChunkRef, 0, len(chunks)),
}
for _, cs := range chunks {
for i := range cs {
result.Refs = append(result.Refs, &cs[i].ChunkRef)
if _, ok := series[cs[i].Fingerprint]; !ok {
series[cs[i].Fingerprint] = cs[i].Metric
}
}
}

Expand All @@ -257,7 +261,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return result, nil
}

chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d1668f6

Please sign in to comment.