Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Exclude label filters where label name is part of the series labels. #14661

Merged
merged 8 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -41,7 +42,15 @@ func stringSlice[T fmt.Stringer](s []T) []string {

func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
return groupChunkRefs(chunkRefs, nil)
grouped := groupChunkRefs(nil, chunkRefs, nil)
// Put fake labels to the series
for _, g := range grouped {
g.Labels = &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", fmt.Sprintf("%d", g.Fingerprint))),
}
}

return grouped
}

func newLimits() *validation.Overrides {
Expand Down Expand Up @@ -295,12 +304,18 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
{Fingerprint: 1000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696248000000, Through: 1696251600000, Checksum: 2},
{From: 1696244400000, Through: 1696248000000, Checksum: 4},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1000")),
}},
{Fingerprint: 2000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696255200000, Through: 1696258800000, Checksum: 3},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2000")),
}},
{Fingerprint: 3000, Tenant: tenantID, Refs: []*logproto.ShortRef{
{From: 1696240800000, Through: 1696244400000, Checksum: 1},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3000")),
}},
},
}, res)
Expand Down Expand Up @@ -405,6 +420,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
// see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go
rnd := rand.Intn(len(inputChunkRefs))
fp := inputChunkRefs[rnd].Fingerprint
lbs := inputChunkRefs[rnd].Labels
chks := inputChunkRefs[rnd].Refs
key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0)

Expand All @@ -428,6 +444,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: fp,
Labels: lbs,
Refs: chks,
Tenant: tenantID,
},
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Labels: chunkRef.Labels,
Tenant: chunkRef.Tenant,
Refs: refs,
})
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
}
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Labels: a.Labels,
Tenant: a.Tenant,
Refs: mergeChunkSets(a.Refs, b.Refs),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (it *requestIterator) Next() bool {
it.curr = v1.Request{
Recorder: it.recorder,
Fp: model.Fingerprint(group.Fingerprint),
Labels: logproto.FromLabelAdaptersToLabels(group.Labels.Labels),
Chks: convertToChunkRefs(group.Refs),
Search: it.search,
Response: it.channel,
Expand Down
9 changes: 9 additions & 0 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

v2 "github.com/grafana/loki/v3/pkg/iter/v2"
Expand Down Expand Up @@ -73,6 +74,8 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-3 * time.Hour), Through: ts.Add(-2 * time.Hour), Checksum: 100},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
}},
},
}
Expand All @@ -83,9 +86,13 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 200},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "100")),
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 300},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
}},
},
}
Expand All @@ -96,6 +103,8 @@ func TestTask_RequestIterator(t *testing.T) {
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 400},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "200")),
}},
},
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/plan"
Expand Down Expand Up @@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
}

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
// Shortcut that does not require any filtering
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
return chunkRefs, nil
Expand All @@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from

grouped := groupedChunksRefPool.Get(len(chunkRefs))
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)
grouped = groupChunkRefs(series, chunkRefs, grouped)

preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)
Expand Down Expand Up @@ -225,7 +226,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// groups them by fingerprint.
// The second argument `grouped` can be used to pass a buffer to avoid allocations.
// If it's nil, the returned slice will be allocated.
func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
func groupChunkRefs(series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
seen := make(map[uint64]int, len(grouped))
for _, chunkRef := range chunkRefs {
if idx, found := seen[chunkRef.Fingerprint]; found {
Expand All @@ -234,10 +235,14 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC
seen[chunkRef.Fingerprint] = len(grouped)
grouped = append(grouped, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[chunkRef.Fingerprint]),
},
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
})
}
}

return grouped
}
54 changes: 38 additions & 16 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
Expand All @@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) {
chunkRefs := []*logproto.ChunkRef{}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
Expand All @@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.Error(t, err)
require.Nil(t, res)
})
Expand All @@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 2, c.callCount)
Expand All @@ -143,28 +145,44 @@ func TestBloomQuerier(t *testing.T) {
}

func TestGroupChunkRefs(t *testing.T) {
series := []labels.Labels{
labels.FromStrings("app", "1"),
labels.FromStrings("app", "2"),
labels.FromStrings("app", "3"),
}
seriesMap := make(map[uint64]labels.Labels)
for _, s := range series {
seriesMap[s.Hash()] = s
}

chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}

result := groupChunkRefs(chunkRefs, nil)
result := groupChunkRefs(seriesMap, chunkRefs, nil)
require.Equal(t, []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[0]),
}},
{Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[1]),
}},
{Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[2]),
}},
}, result)
}
Expand All @@ -175,11 +193,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
n := 1000 // num series
m := 10000 // num chunks per series
chunkRefs := make([]*logproto.ChunkRef, 0, n*m)
series := make(map[uint64]labels.Labels, n)

for i := 0; i < n; i++ {
s := labels.FromStrings("app", fmt.Sprintf("%d", i))
sFP := s.Hash()
series[sFP] = s
for j := 0; j < m; j++ {
chunkRefs = append(chunkRefs, &logproto.ChunkRef{
Fingerprint: uint64(n),
Fingerprint: sFP,
UserID: "tenant",
From: mktime("2024-04-20 00:00"),
Through: mktime("2024-04-20 00:59"),
Expand All @@ -196,5 +218,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
b.StartTimer()

groups := make([]*logproto.GroupedChunkRefs, 0, n)
groupChunkRefs(chunkRefs, groups)
groupChunkRefs(series, chunkRefs, groups)
}
1 change: 1 addition & 0 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto

res = append(res, &logproto.GroupedChunkRefs{
Fingerprint: series.Fingerprint,
Labels: series.Labels,
Tenant: series.Tenant,
Refs: relevantChunks,
})
Expand Down
8 changes: 6 additions & 2 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type IndexClientWithRange struct {
}

type BloomQuerier interface {
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
}

type Gateway struct {
Expand Down Expand Up @@ -225,12 +225,16 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return nil, err
}

series := make(map[uint64]labels.Labels)
result = &logproto.GetChunkRefResponse{
Refs: make([]*logproto.ChunkRef, 0, len(chunks)),
}
for _, cs := range chunks {
for i := range cs {
result.Refs = append(result.Refs, &cs[i].ChunkRef)
if _, ok := series[cs[i].Fingerprint]; !ok {
series[cs[i].Fingerprint] = cs[i].Metric
}
}
}

Expand All @@ -257,7 +261,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return result, nil
}

chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading