Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Suppress error from resolving server addresses for blocks #13385

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,20 @@ type Client interface {
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
}

// clientPool is a minimal interface that is satisfied by the JumpHashClientPool.
// It does only expose functions that are used by the GatewayClient
// and is required to mock the JumpHashClientPool in tests.
type clientPool interface {
GetClientFor(string) (ringclient.PoolClient, error)
Addr(string) (string, error)
Stop()
}

type GatewayClient struct {
cfg ClientConfig
logger log.Logger
metrics *clientMetrics
pool *JumpHashClientPool
pool clientPool
dnsProvider *discovery.DNS
}

Expand Down Expand Up @@ -211,8 +220,15 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
servers := make([]addrWithGroups, 0, len(blocks))
for _, blockWithSeries := range blocks {
addr, err := c.pool.Addr(blockWithSeries.block.String())

// the client should return the full, unfiltered list of chunks instead of an error
if err != nil {
return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block)
level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", blockWithSeries.block, "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think resolving a single address should necessarily fail all addresses, but gracefully degrading the whole request is a fair place to start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one possible ways how this can fail, because there is only one case Addr() returns an error, and this is because there are no servers available.
Therefore subsequent calls of Addr() are very likely to fail as well.

var series [][]*logproto.GroupedChunkRefs
for i := range blocks {
series = append(series, blocks[i].series)
}
return mergeSeries(series, nil)
}

if idx, found := pos[addr]; found {
Expand Down
38 changes: 37 additions & 1 deletion pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
Expand All @@ -16,16 +17,24 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

type errorMockPool struct {
*JumpHashClientPool
}

func (p *errorMockPool) Addr(_ string) (string, error) {
return "", errors.New("no server found")
}

func TestBloomGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

limits := newLimits()

cfg := ClientConfig{}
flagext.DefaultValues(&cfg)

t.Run("FilterChunks returns response", func(t *testing.T) {
reg := prometheus.NewRegistry()
c, err := NewClient(cfg, limits, reg, logger, nil, false)
require.NoError(t, err)
expr, err := syntax.ParseExpr(`{foo="bar"}`)
Expand All @@ -34,6 +43,33 @@ func TestBloomGatewayClient(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(res))
})

t.Run("pool error is suppressed and returns full list of chunks", func(t *testing.T) {
reg := prometheus.NewRegistry()
c, err := NewClient(cfg, limits, reg, logger, nil, false)
require.NoError(t, err)
c.pool = &errorMockPool{}

expected := []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1)}},
{Fingerprint: 0x9f, Refs: []*logproto.ShortRef{shortRef(0, 1, 2)}},
{Fingerprint: 0xa0, Refs: []*logproto.ShortRef{shortRef(0, 1, 3)}},
{Fingerprint: 0xff, Refs: []*logproto.ShortRef{shortRef(0, 1, 4)}},
}

blocks := []blockWithSeries{
{block: mkBlockRef(0x00, 0x9f), series: expected[0:2]},
{block: mkBlockRef(0xa0, 0xff), series: expected[2:4]},
}
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)

res, err := c.FilterChunks(context.Background(), "tenant", bloomshipper.NewInterval(0, 0), blocks, plan.QueryPlan{AST: expr})
require.NoError(t, err)
require.Equal(t, 4, len(res))

require.Equal(t, expected, res)
})
}

func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {
Expand Down
Loading