Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Exclude label filters where label name is part of the series labels. #14661

Merged
merged 8 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"

iter "github.com/grafana/loki/v3/pkg/iter/v2"
Expand Down Expand Up @@ -160,6 +161,18 @@ func (g *Gateway) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

func extractSeries(refs []*logproto.GroupedChunkRefs) []labels.Labels {
series := make([]labels.Labels, 0, len(refs))
for _, ref := range refs {
if ref.Labels == nil {
continue
}
lbs := logproto.FromLabelAdaptersToLabels(ref.Labels.Labels)
series = append(series, lbs)
}
return series
}

// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
Expand Down Expand Up @@ -193,7 +206,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("from time must not be after through time")
}

matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST)
// This time, we do pass the series to the ExtractTestableLabelMatchers
// To this point, we have called ExtractTestableLabelMatchers multiple times
// without the series (faster) just to return early if there are no filters expressions.
// We now need to be more precise and only extract matchers that do not
// match the series labels.
matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST, extractSeries(req.Refs)...)
stats.NumMatchers = len(matchers)
g.metrics.receivedMatchers.Observe(float64(len(matchers)))

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func stringSlice[T fmt.Stringer](s []T) []string {

func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.GroupedChunkRefs {
t.Helper()
return groupChunkRefs(chunkRefs, nil)
return groupChunkRefs(nil, chunkRefs, nil)
}

func newLimits() *validation.Overrides {
Expand Down
16 changes: 13 additions & 3 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/plan"
Expand Down Expand Up @@ -101,7 +102,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
}

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
// Shortcut that does not require any filtering
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
return chunkRefs, nil
Expand All @@ -112,7 +113,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from

grouped := groupedChunksRefPool.Get(len(chunkRefs))
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)
grouped = groupChunkRefs(series, chunkRefs, grouped)

preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)
Expand Down Expand Up @@ -225,7 +226,14 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// groups them by fingerprint.
// The second argument `grouped` can be used to pass a buffer to avoid allocations.
// If it's nil, the returned slice will be allocated.
func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
func groupChunkRefs(series []labels.Labels, chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
seriesFPs := make(map[uint64]*logproto.IndexSeries, len(series))
for _, s := range series {
seriesFPs[s.Hash()] = &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(s),
}
}

seen := make(map[uint64]int, len(grouped))
for _, chunkRef := range chunkRefs {
if idx, found := seen[chunkRef.Fingerprint]; found {
Expand All @@ -234,10 +242,12 @@ func groupChunkRefs(chunkRefs []*logproto.ChunkRef, grouped []*logproto.GroupedC
seen[chunkRef.Fingerprint] = len(grouped)
grouped = append(grouped, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Labels: seriesFPs[chunkRef.Fingerprint],
Tenant: chunkRef.UserID,
Refs: []*logproto.ShortRef{convertToShortRef(chunkRef)},
})
}
}

return grouped
}
50 changes: 34 additions & 16 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -79,7 +81,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
Expand All @@ -95,7 +97,7 @@ func TestBloomQuerier(t *testing.T) {
chunkRefs := []*logproto.ChunkRef{}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 0, c.callCount)
Expand All @@ -115,7 +117,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.Error(t, err)
require.Nil(t, res)
})
Expand All @@ -134,7 +136,7 @@ func TestBloomQuerier(t *testing.T) {
}
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
require.NoError(t, err)
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, nil, chunkRefs, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, chunkRefs, res)
require.Equal(t, 2, c.callCount)
Expand All @@ -143,28 +145,40 @@ func TestBloomQuerier(t *testing.T) {
}

func TestGroupChunkRefs(t *testing.T) {
series := []labels.Labels{
labels.FromStrings("app", "1"),
labels.FromStrings("app", "2"),
labels.FromStrings("app", "3"),
}

chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x00, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x01, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: 0x02, UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[0].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[1].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{Fingerprint: series[2].Hash(), UserID: "tenant", From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}

result := groupChunkRefs(chunkRefs, nil)
result := groupChunkRefs(series, chunkRefs, nil)
require.Equal(t, []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[0].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[0]),
}},
{Fingerprint: 0x01, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[1].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[1]),
}},
{Fingerprint: 0x02, Tenant: "tenant", Refs: []*logproto.ShortRef{
{Fingerprint: series[2].Hash(), Tenant: "tenant", Refs: []*logproto.ShortRef{
{From: mktime("2024-04-20 00:00"), Through: mktime("2024-04-20 00:59")},
{From: mktime("2024-04-20 01:00"), Through: mktime("2024-04-20 01:59")},
}, Labels: &logproto.IndexSeries{
Labels: logproto.FromLabelsToLabelAdapters(series[2]),
}},
}, result)
}
Expand All @@ -175,11 +189,15 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
n := 1000 // num series
m := 10000 // num chunks per series
chunkRefs := make([]*logproto.ChunkRef, 0, n*m)
series := make([]labels.Labels, 0, n)

for i := 0; i < n; i++ {
s := labels.FromStrings("app", fmt.Sprintf("%d", i))
sFP := s.Hash()
series = append(series, s)
for j := 0; j < m; j++ {
chunkRefs = append(chunkRefs, &logproto.ChunkRef{
Fingerprint: uint64(n),
Fingerprint: sFP,
UserID: "tenant",
From: mktime("2024-04-20 00:00"),
Through: mktime("2024-04-20 00:59"),
Expand All @@ -196,5 +214,5 @@ func BenchmarkGroupChunkRefs(b *testing.B) {
b.StartTimer()

groups := make([]*logproto.GroupedChunkRefs, 0, n)
groupChunkRefs(chunkRefs, groups)
groupChunkRefs(series, chunkRefs, groups)
}
9 changes: 7 additions & 2 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type IndexClientWithRange struct {
}

type BloomQuerier interface {
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series []labels.Labels, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's a little unfortunate we have to pass series and chunks separately instead of passing through GroupedChunkRefs. I'm not sure it's worth fixing this though, so I don't mind it staying this way.

}

type Gateway struct {
Expand Down Expand Up @@ -257,7 +257,12 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return result, nil
}

chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
series, err := g.indexQuerier.GetSeries(ctx, instanceID, req.From, req.Through, matchers...)
if err != nil {
return nil, err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the call to GetChunks on 223, the returned chunks matrix has a field called Metric labels.Labels alongside ChunkRef.

I think that might be the labels of the series (though it's named in a way that makes me less sure). But if it is the labels, we can remove this call in favour of the existing GetChunks call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT from looking at the code, looks like it does contain the stream labels


chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading