From 784e7d562fedec7134c8ed4e2cee8ccb7049e271 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 23 Jul 2024 11:30:51 +0200 Subject: [PATCH] fix: Keep blocks referenced by newer metas (#13614) --- pkg/bloombuild/planner/planner.go | 41 ++++++++-- pkg/bloombuild/planner/planner_test.go | 100 +++++++++++++++++++++++-- pkg/bloomgateway/processor.go | 2 +- 3 files changed, 126 insertions(+), 17 deletions(-) diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 08f6bb1c40bb7..285795a8327d2 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -360,7 +360,7 @@ func (p *Planner) computeTasks( // In case the planner restarted before deleting outdated metas in the previous iteration, // we delete them during the planning phase to avoid reprocessing them. - metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, metas, phasePlanning) + metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, nil, metas, phasePlanning) if err != nil { return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err) } @@ -446,8 +446,7 @@ func (p *Planner) processTenantTaskResults( return tasksSucceed, nil } - combined := append(originalMetas, newMetas...) - if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, combined, phaseBuilding); err != nil { + if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, newMetas, originalMetas, phaseBuilding); err != nil { return 0, fmt.Errorf("failed to delete outdated metas: %w", err) } @@ -460,12 +459,14 @@ func (p *Planner) deleteOutdatedMetasAndBlocks( ctx context.Context, table config.DayTable, tenant string, - metas []bloomshipper.Meta, + newMetas []bloomshipper.Meta, + originalMetas []bloomshipper.Meta, phase string, ) ([]bloomshipper.Meta, error) { logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "phase", phase) - upToDate, outdated := outdatedMetas(metas) + combined := append(originalMetas, newMetas...) + upToDate, outdated := outdatedMetas(combined) if len(outdated) == 0 { level.Debug(logger).Log( "msg", "no outdated metas found", @@ -497,17 +498,25 @@ func (p *Planner) deleteOutdatedMetasAndBlocks( for _, meta := range outdated { for _, block := range meta.Blocks { + logger := log.With(logger, "block", block.String()) + + // Prevent deleting blocks that are reused in new metas + if isBlockInMetas(block, upToDate) { + level.Debug(logger).Log("msg", "block is still in use in new meta, skipping delete") + continue + } + if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil { if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) + level.Debug(logger).Log("msg", "block not found while attempting delete, continuing") } else { - level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String()) + level.Error(logger).Log("msg", "failed to delete block", "err", err) return nil, errors.Wrap(err, "failed to delete block") } } deletedBlocks++ - level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) + level.Debug(logger).Log("msg", "removed outdated block") } err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}) @@ -532,6 +541,22 @@ func (p *Planner) deleteOutdatedMetasAndBlocks( return upToDate, nil } +func isBlockInMetas(block bloomshipper.BlockRef, metas []bloomshipper.Meta) bool { + // Blocks are sorted within a meta, so we can find it with binary search + for _, meta := range metas { + // Search for the first block whose bound is >= than the target block min bound. + i := sort.Search(len(meta.Blocks), func(i int) bool { + return meta.Blocks[i].Cmp(uint64(block.Bounds.Max)) <= v1.Overlap + }) + + if i < len(meta.Blocks) && meta.Blocks[i] == block { + return true + } + } + + return false +} + func (p *Planner) tables(ts time.Time) *dayRangeIterator { // adjust the minimum by one to make it inclusive, which is more intuitive // for a configuration variable diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index ab34c82c6940d..ca5c1d0c15b09 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -1,6 +1,7 @@ package planner import ( + "bytes" "context" "fmt" "io" @@ -20,6 +21,8 @@ import ( "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/bloombuild/protos" + "github.com/grafana/loki/v3/pkg/chunkenc" + iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -166,11 +169,36 @@ func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { } } -func genBlock(ref bloomshipper.BlockRef) bloomshipper.Block { +func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := v1.NewByteReader(indexBuf, bloomsBuf) + + blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0) + + builder, err := v1.NewBlockBuilder(blockOpts, writer) + if err != nil { + return bloomshipper.Block{}, err + } + + if _, err = builder.BuildFrom(iter.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil { + return bloomshipper.Block{}, err + } + + block := v1.NewBlock(reader, v1.NewMetrics(nil)) + + buf := bytes.NewBuffer(nil) + if err := v1.TarGz(buf, block.Reader()); err != nil { + return bloomshipper.Block{}, err + } + + tarReader := bytes.NewReader(buf.Bytes()) + return bloomshipper.Block{ BlockRef: ref, - Data: &DummyReadSeekCloser{}, - } + Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader}, + }, nil } func Test_blockPlansForGaps(t *testing.T) { @@ -612,7 +640,12 @@ func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error } for _, block := range meta.Blocks { - err := bloomClient.PutBlock(context.Background(), genBlock(block)) + writtenBlock, err := genBlock(block) + if err != nil { + return err + } + + err = bloomClient.PutBlock(context.Background(), writtenBlock) if err != nil { return err } @@ -826,6 +859,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { for _, tc := range []struct { name string originalMetas []bloomshipper.Meta + newMetas []bloomshipper.Meta expectedUpToDateMetas []bloomshipper.Meta }{ { @@ -835,6 +869,8 @@ func Test_deleteOutdatedMetas(t *testing.T) { name: "only up to date metas", originalMetas: []bloomshipper.Meta{ genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), + }, + newMetas: []bloomshipper.Meta{ genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), }, expectedUpToDateMetas: []bloomshipper.Meta{ @@ -846,13 +882,52 @@ func Test_deleteOutdatedMetas(t *testing.T) { name: "outdated metas", originalMetas: []bloomshipper.Meta{ genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), - genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}), + }, + newMetas: []bloomshipper.Meta{ genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), }, expectedUpToDateMetas: []bloomshipper.Meta{ genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), }, }, + { + name: "new metas reuse blocks from outdated meta", + originalMetas: []bloomshipper.Meta{ + genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ // Outdated + genBlockRef(0, 5), // Reuse + genBlockRef(5, 10), // Delete + }), + genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{ // Outdated + genBlockRef(10, 20), // Reuse + }), + genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ // Up to date + genBlockRef(20, 30), + }), + }, + newMetas: []bloomshipper.Meta{ + genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ + genBlockRef(0, 5), // Reused block + }), + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ + genBlockRef(5, 7), // New block + genBlockRef(7, 10), // New block + genBlockRef(10, 20), // Reused block + }), + }, + expectedUpToDateMetas: []bloomshipper.Meta{ + genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ + genBlockRef(0, 5), + }), + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ + genBlockRef(5, 7), + genBlockRef(7, 10), + genBlockRef(10, 20), + }), + genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ + genBlockRef(20, 30), + }), + }, + }, } { t.Run(tc.name, func(t *testing.T) { logger := log.NewNopLogger() @@ -867,9 +942,11 @@ func Test_deleteOutdatedMetas(t *testing.T) { bloomClient, err := planner.bloomStore.Client(testDay.ModelTime()) require.NoError(t, err) - // Create original metas and blocks + // Create original/new metas and blocks err = putMetas(bloomClient, tc.originalMetas) require.NoError(t, err) + err = putMetas(bloomClient, tc.newMetas) + require.NoError(t, err) // Get all metas metas, err := planner.bloomStore.FetchMetas( @@ -882,9 +959,9 @@ func Test_deleteOutdatedMetas(t *testing.T) { ) require.NoError(t, err) removeLocFromMetasSources(metas) - require.ElementsMatch(t, tc.originalMetas, metas) + require.ElementsMatch(t, append(tc.originalMetas, tc.newMetas...), metas) - upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning) + upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.newMetas, tc.originalMetas, phasePlanning) require.NoError(t, err) require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate) @@ -900,6 +977,13 @@ func Test_deleteOutdatedMetas(t *testing.T) { require.NoError(t, err) removeLocFromMetasSources(metas) require.ElementsMatch(t, tc.expectedUpToDateMetas, metas) + + // Fetch all blocks from the metas + for _, meta := range metas { + blocks, err := planner.bloomStore.FetchBlocks(context.Background(), meta.Blocks) + require.NoError(t, err) + require.Len(t, blocks, len(meta.Blocks)) + } }) } } diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 47742f0f86057..edd882e1e4210 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -52,7 +52,7 @@ func (p *processor) processTasks(ctx context.Context, tasks []Task) error { return nil } -func (p *processor) processTasksForDay(ctx context.Context, tenant string, day config.DayTime, tasks []Task) error { +func (p *processor) processTasksForDay(ctx context.Context, _ string, _ config.DayTime, tasks []Task) error { var duration time.Duration blocksRefs := make([]bloomshipper.BlockRef, 0, len(tasks[0].blocks)*len(tasks))