Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Deduplicate filtered series and chunks #12791

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 8 additions & 47 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package bloomgateway
import (
"context"
"flag"
"sort"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -95,58 +93,21 @@ func newMerger() merger {
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
size += len(res.ChunkRefs)
}

chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
unmerged := make([][]*logproto.GroupedChunkRefs, 0, len(responses))
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs = append(chunkRefs, res.ChunkRefs...)
}

return &logproto.FilterChunkRefResponse{
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
}, nil
}

// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
if len(chunkRefs) <= 1 {
return chunkRefs
}

sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})

var lastDiffFP int
for i := 1; i < len(chunkRefs); i++ {
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
} else {
lastDiffFP++
chunkRefs[lastDiffFP] = chunkRefs[i]
}
unmerged = append(unmerged, res.ChunkRefs)
size += len(res.ChunkRefs)
}
return chunkRefs[:lastDiffFP+1]
}

// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
if len(refs) <= 1 {
return refs
buf := make([]*logproto.GroupedChunkRefs, 0, size)
deduped, err := mergeSeries(unmerged, buf)
if err != nil {
return nil, err
}

sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
return a.Checksum == b.Checksum
})
return &logproto.FilterChunkRefResponse{ChunkRefs: deduped}, nil
}

type ClientCache struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloomgateway/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ func TestMerge(t *testing.T) {
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 1000,
Through: 1500,
Expand All @@ -303,11 +308,6 @@ func TestMerge(t *testing.T) {
Through: 2500,
Checksum: 30,
},
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Expand Down
106 changes: 88 additions & 18 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bloomgateway
import (
"context"
"flag"
"fmt"
"io"
"math"
"sort"
Expand All @@ -15,13 +14,15 @@ import (
ringclient "github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/queue"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -258,17 +259,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint
})

level.Info(c.logger).Log(
"msg", "do FilterChunkRefs for addresses",
"part", fmt.Sprintf("%d/%d", i+1, len(servers)),
"addr", rs.addr,
"from", interval.Start.Time(),
"through", interval.End.Time(),
"series", len(rs.groups),
"blocks", len(rs.blocks),
"tenant", tenant,
)

return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: interval.Start,
Expand All @@ -290,15 +280,95 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva
if err != nil {
return nil, err
}
return flatten(results, count), nil

buf := make([]*logproto.GroupedChunkRefs, 0, count)
return mergeSeries(results, buf)
}

func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChunkRefs {
result := make([]*logproto.GroupedChunkRefs, 0, n)
for _, res := range input {
result = append(result, res...)
// mergeSeries combines respones from multiple FilterChunkRefs calls and deduplicates
// chunks from series that appear in multiple responses.
// To avoid allocations, an optional slice can be passed as second argument.
func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) {
// clear provided buffer
buf = buf[:0]

iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input))
for _, inp := range input {
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}
return result

heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
func(a, b *logproto.GroupedChunkRefs) bool {
return a.Fingerprint < b.Fingerprint
},
iters...,
)

dedupeIter := v1.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs](
// eq
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint },
// from
v1.Identity[*logproto.GroupedChunkRefs],
// merge
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
return &logproto.GroupedChunkRefs{
Fingerprint: a.Fingerprint,
Tenant: a.Tenant,
Refs: mergeChunks(a.Refs, b.Refs),
}
},
// iterator
v1.NewPeekingIter(heapIter),
)

return v1.CollectInto(dedupeIter, buf)
}

func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef {
if len(inputs) == 0 {
return nil
}

if len(inputs) == 1 {
slices.SortFunc(
inputs[0],
func(a, b *logproto.ShortRef) int {
if a.Equal(b) {
return 0
}
if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) {
return -1
}
return 1
},
)
return inputs[0]
}

iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs))
for _, inp := range inputs {
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp)))
}

chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef](
// eq
func(a, b *logproto.ShortRef) bool { return a.Equal(b) },
// from
v1.Identity[*logproto.ShortRef],
// merge
func(a, b *logproto.ShortRef) *logproto.ShortRef { return a },
// iterator
v1.NewPeekingIter[*logproto.ShortRef](
v1.NewHeapIterator[*logproto.ShortRef](
func(a, b *logproto.ShortRef) bool {
return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through))
},
iters...,
),
),
)
merged, _ := v1.Collect(chunkDedupe)
return merged
}

// doForAddrs sequetially calls the provided callback function fn for each
Expand Down
37 changes: 37 additions & 0 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand All @@ -33,3 +35,38 @@ func TestBloomGatewayClient(t *testing.T) {
require.Equal(t, 0, len(res))
})
}

func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {
return &logproto.ShortRef{
From: f,
Through: t,
Checksum: c,
}
}

func TestGatewayClient_MergeSeries(t *testing.T) {
inputs := [][]*logproto.GroupedChunkRefs{
// response 1
{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks
},
// response 2
{
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
},
}

expected := []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
}

result, _ := mergeSeries(inputs, nil)
require.Equal(t, expected, result)
}
2 changes: 1 addition & 1 deletion pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type BlockResolver interface {
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error)
Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error)
}

type blockWithSeries struct {
Expand Down
Loading