From b6033138b333f5b698c97e63fc699a9667e102fe Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 30 Apr 2024 16:27:51 +0200 Subject: [PATCH] fix(blooms): Do not fail requests when fetching metas from cache fails (#12838) The bloom shipper uses metas to resolve available blocks. Metas are fetched from cache, and if not available from object storage. If fetching metas from cache fails, e.g. timeout, the request should not fail, but proceed as if no metas were available. Signed-off-by: Christian Haudum --- pkg/storage/chunk/cache/mock.go | 17 ++++++++++++++++- .../stores/shipper/bloomshipper/fetcher.go | 3 ++- .../stores/shipper/bloomshipper/fetcher_test.go | 12 ++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/pkg/storage/chunk/cache/mock.go b/pkg/storage/chunk/cache/mock.go index 1b0f60da3dec3..dfa1eb759d274 100644 --- a/pkg/storage/chunk/cache/mock.go +++ b/pkg/storage/chunk/cache/mock.go @@ -13,16 +13,27 @@ type MockCache interface { GetInternal() map[string][]byte KeysRequested() int GetKeys() []string + SetErr(error, error) } type mockCache struct { numKeyUpdates int keysRequested int sync.Mutex - cache map[string][]byte + cache map[string][]byte + storeErr error // optional error that is returned when calling Store() + fetchErr error // optional error that is returned when calling Fetch() +} + +func (m *mockCache) SetErr(storeErr, fetchErr error) { + m.storeErr, m.fetchErr = storeErr, fetchErr } func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error { + if m.storeErr != nil { + return m.storeErr + } + m.Lock() defer m.Unlock() for i := range keys { @@ -33,6 +44,10 @@ func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error } func (m *mockCache) Fetch(_ context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { + if m.fetchErr != nil { + return nil, nil, nil, m.fetchErr + } + m.Lock() defer m.Unlock() for _, key := range keys { diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 936e120af8501..de02d1effba8c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -129,7 +129,8 @@ func (f *Fetcher) FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error } cacheHits, cacheBufs, _, err := f.metasCache.Fetch(ctx, keys) if err != nil { - return nil, err + level.Error(f.logger).Log("msg", "failed to fetch metas from cache", "err", err) + return nil, nil } fromCache, missing, err := f.processMetasCacheResponse(ctx, refs, cacheHits, cacheBufs) diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index 43658f9ed2137..847c5f69b6a29 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -72,6 +73,7 @@ func TestMetasFetcher(t *testing.T) { start []Meta // initial cache state end []Meta // final cache state fetch []Meta // metas to fetch + err error // error that is returned when calling cache.Fetch() }{ { name: "all metas found in cache", @@ -94,12 +96,22 @@ func TestMetasFetcher(t *testing.T) { end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}), fetch: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}), }, + { + name: "error fetching metas yields empty result", + err: errors.New("failed to fetch"), + store: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}, {Min: 0x10000, Max: 0x1ffff}}), + start: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}), + end: makeMetas(t, schemaCfg, now, []v1.FingerprintBounds{{Min: 0x0000, Max: 0xffff}}), + fetch: []Meta{}, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ctx := context.Background() metasCache := cache.NewMockCache() + metasCache.SetErr(nil, test.err) + cfg := bloomStoreConfig{workingDirs: []string{t.TempDir()}, numWorkers: 1} oc, err := local.NewFSObjectClient(local.FSConfig{Directory: dir})