Skip to content

Commit

Permalink
Add basic test for bloom gateway client
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Oct 3, 2023
1 parent bf6c47e commit f415580
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
}

// TODO(chaudum): Re-use buffers for response.
resp := make([]*logproto.ChunkIDsForStream, 0)
for idx, chunkRef := range chunkRefs {
fp := chunkRef.Fingerprint
Expand Down
52 changes: 44 additions & 8 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"math/rand"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -122,6 +123,13 @@ func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Reg
return c, nil
}

func shuffleAddrs(addrs []string) []string {
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
return addrs
}

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, chunkRefs [][]*logproto.ChunkRef, filters ...*logproto.LineFilterExpression) ([]uint64, [][]*logproto.ChunkRef, error) {
// Get the addresses of corresponding bloom gateways for each series.
Expand All @@ -134,11 +142,14 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
// All chunk refs of series that belong to one and the same bloom gateway are set in one batch.
streamsByAddr := c.groupStreamsByAddr(fingerprints, chunkRefs, addrs)

// TODO(chaudum): We might over-allocate for the filtered responses here?
filteredChunkRefs := make([][]*logproto.ChunkRef, 0, len(fingerprints))
filteredFingerprints := make([]uint64, 0, len(fingerprints))

for _, item := range streamsByAddr {
err := c.doForAddrs(item.addrs, func(client logproto.BloomGatewayClient) error {
// randomize order of addresses so we don't hotspot the first server in the list
addrs := shuffleAddrs(item.addrs)
err := c.doForAddrs(addrs, func(client logproto.BloomGatewayClient) error {
req := &logproto.FilterChunkRefRequest{
From: from,
Through: through,
Expand Down Expand Up @@ -171,7 +182,9 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
return fingerprints, filteredChunkRefs, nil
}

func IsEqualAddresses(a, b []string) bool {
// isEqualStringElements checks if two string slices contain the same elements.
// The order of the elements is ignored.
func isEqualStringElements(a, b []string) bool {
if len(a) != len(b) {
return false
}
Expand All @@ -183,34 +196,52 @@ func IsEqualAddresses(a, b []string) bool {
return true
}

func ListContainsAddrs(list []chunkRefsByAddrs, addrs []string) (int, bool) {
// listContainsAddrs checks if a slice of chunkRefAddrs contains an element
// whos field addrs contains the same addresses as the given slice of
// addresses.
// It returns the index of the element, if found, and a boolean whether the
// given list contains the given addrs.
func listContainsAddrs(list []chunkRefsByAddrs, addrs []string) (int, bool) {
for i, r := range list {
if IsEqualAddresses(r.addrs, addrs) {
if isEqualStringElements(r.addrs, addrs) {
return i, true
}
}
return -1, false
}

type chunkRefsByAddrs struct {
addrs []string
refs []*logproto.ChunkRef
addrs []string
refs []*logproto.ChunkRef
streams []uint64
}

// groupStreamsByAddr takes a slice of stream fingerprints, a slices of chunkRef slices, and a slice of address slices
// and groups them into a slice of chunkRefsByAddrs.
// streams is a slice of uint64 stream fingerprints
// chunks is a slice of chunk ref slices
// addresses is a slice of string slices containing server addresses
// It is necessary that len(streams) == len(chunks) == len(addresses), but the
// function implementation does not validate the precondition and would fail silently.
func (c *GatewayClient) groupStreamsByAddr(streams []uint64, chunks [][]*logproto.ChunkRef, addresses [][]string) []chunkRefsByAddrs {
res := make([]chunkRefsByAddrs, 0, len(addresses))
for i := 0; i < len(addresses); i++ {
addrs := addresses[i]
refs := chunks[i]
if idx, ok := ListContainsAddrs(res, addrs); ok {
fp := streams[i]
if idx, ok := listContainsAddrs(res, addrs); ok {
res[idx].refs = append(res[idx].refs, refs...)
res[idx].streams = append(res[idx].streams, fp)
} else {
res = append(res, chunkRefsByAddrs{addrs: addrs, refs: refs})
res = append(res, chunkRefsByAddrs{addrs: addrs, refs: refs, streams: []uint64{fp}})
}
}
return res
}

// doForAddrs sequetially calls the provided callback function fn for each
// address in given slice addrs until the callback function does not return an
// error.
func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGatewayClient) error) error {
var err error
var poolClient ringclient.PoolClient
Expand All @@ -231,6 +262,11 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
return err
}

// serverAddrsForFingerprints returns a slices of server address slices for
// each fingerprint of given fingerprints.
// The indexes of the returned slices correspond to each other.
// Returns an error in case the bloom gateway ring could not get the
// corresponding replica set for a given fingerprint.
func (c *GatewayClient) serverAddrsForFingerprints(tenantID string, fingerprints []uint64) ([]uint64, [][]string, error) {
subRing := GetShuffleShardingSubring(c.ring, tenantID, c.limits)

Expand Down
120 changes: 120 additions & 0 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package bloomgateway

import (
"testing"

"github.com/go-kit/kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestBloomGatewayClient(t *testing.T) {

logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1}, nil)
require.NoError(t, err)

cfg := ClientConfig{}
flagext.DefaultValues(&cfg)

t.Run("", func(t *testing.T) {
_, err := NewGatewayClient(cfg, l, reg, logger)
require.NoError(t, err)
})
}

func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {

logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1}, nil)
require.NoError(t, err)

cfg := ClientConfig{}
flagext.DefaultValues(&cfg)

c, err := NewGatewayClient(cfg, l, reg, logger)
require.NoError(t, err)

testCases := []struct {
name string
streams []uint64
chunks [][]*logproto.ChunkRef
addresses [][]string
expected []chunkRefsByAddrs
}{
{
name: "empty input yields empty result",
streams: []uint64{},
chunks: [][]*logproto.ChunkRef{},
addresses: [][]string{},
expected: []chunkRefsByAddrs{},
},
{
name: "addresses with same elements are grouped into single item",
streams: []uint64{1, 2, 3},
chunks: [][]*logproto.ChunkRef{
{{Fingerprint: 1, Checksum: 1}},
{{Fingerprint: 2, Checksum: 2}},
{{Fingerprint: 3, Checksum: 3}},
},
addresses: [][]string{
{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
{"10.0.0.2", "10.0.0.3", "10.0.0.1"},
{"10.0.0.3", "10.0.0.1", "10.0.0.2"},
},
expected: []chunkRefsByAddrs{
{
addrs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
refs: []*logproto.ChunkRef{
{Fingerprint: 1, Checksum: 1},
{Fingerprint: 2, Checksum: 2},
{Fingerprint: 3, Checksum: 3},
},
streams: []uint64{1, 2, 3},
},
},
},
{
name: "partially overlapping addresses are not grouped together",
streams: []uint64{1, 2},
chunks: [][]*logproto.ChunkRef{
{{Fingerprint: 1, Checksum: 1}},
{{Fingerprint: 2, Checksum: 2}},
},
addresses: [][]string{
{"10.0.0.1", "10.0.0.2"},
{"10.0.0.2", "10.0.0.3"},
},
expected: []chunkRefsByAddrs{
{
addrs: []string{"10.0.0.1", "10.0.0.2"},
refs: []*logproto.ChunkRef{
{Fingerprint: 1, Checksum: 1},
},
streams: []uint64{1},
},
{
addrs: []string{"10.0.0.2", "10.0.0.3"},
refs: []*logproto.ChunkRef{
{Fingerprint: 2, Checksum: 2},
},
streams: []uint64{2},
},
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
res := c.groupStreamsByAddr(tc.streams, tc.chunks, tc.addresses)
require.Equal(t, tc.expected, res)
})
}
}

0 comments on commit f415580

Please sign in to comment.