Skip to content

Commit

Permalink
fix: Keep blocks referenced by newer metas (#13614)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jul 23, 2024
1 parent 64215e1 commit 784e7d5
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 17 deletions.
41 changes: 33 additions & 8 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (p *Planner) computeTasks(

// In case the planner restarted before deleting outdated metas in the previous iteration,
// we delete them during the planning phase to avoid reprocessing them.
metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, metas, phasePlanning)
metas, err = p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, nil, metas, phasePlanning)
if err != nil {
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}
Expand Down Expand Up @@ -446,8 +446,7 @@ func (p *Planner) processTenantTaskResults(
return tasksSucceed, nil
}

combined := append(originalMetas, newMetas...)
if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, combined, phaseBuilding); err != nil {
if _, err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, newMetas, originalMetas, phaseBuilding); err != nil {
return 0, fmt.Errorf("failed to delete outdated metas: %w", err)
}

Expand All @@ -460,12 +459,14 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
ctx context.Context,
table config.DayTable,
tenant string,
metas []bloomshipper.Meta,
newMetas []bloomshipper.Meta,
originalMetas []bloomshipper.Meta,
phase string,
) ([]bloomshipper.Meta, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant, "phase", phase)

upToDate, outdated := outdatedMetas(metas)
combined := append(originalMetas, newMetas...)
upToDate, outdated := outdatedMetas(combined)
if len(outdated) == 0 {
level.Debug(logger).Log(
"msg", "no outdated metas found",
Expand Down Expand Up @@ -497,17 +498,25 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(

for _, meta := range outdated {
for _, block := range meta.Blocks {
logger := log.With(logger, "block", block.String())

// Prevent deleting blocks that are reused in new metas
if isBlockInMetas(block, upToDate) {
level.Debug(logger).Log("msg", "block is still in use in new meta, skipping delete")
continue
}

if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing")
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
level.Error(logger).Log("msg", "failed to delete block", "err", err)
return nil, errors.Wrap(err, "failed to delete block")
}
}

deletedBlocks++
level.Debug(logger).Log("msg", "removed outdated block", "block", block.String())
level.Debug(logger).Log("msg", "removed outdated block")
}

err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
Expand All @@ -532,6 +541,22 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
return upToDate, nil
}

func isBlockInMetas(block bloomshipper.BlockRef, metas []bloomshipper.Meta) bool {
// Blocks are sorted within a meta, so we can find it with binary search
for _, meta := range metas {
// Search for the first block whose bound is >= than the target block min bound.
i := sort.Search(len(meta.Blocks), func(i int) bool {
return meta.Blocks[i].Cmp(uint64(block.Bounds.Max)) <= v1.Overlap
})

if i < len(meta.Blocks) && meta.Blocks[i] == block {
return true
}
}

return false
}

func (p *Planner) tables(ts time.Time) *dayRangeIterator {
// adjust the minimum by one to make it inclusive, which is more intuitive
// for a configuration variable
Expand Down
100 changes: 92 additions & 8 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package planner

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -20,6 +21,8 @@ import (
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
Expand Down Expand Up @@ -166,11 +169,36 @@ func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
}
}

func genBlock(ref bloomshipper.BlockRef) bloomshipper.Block {
func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
return bloomshipper.Block{}, err
}

if _, err = builder.BuildFrom(iter.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil {
return bloomshipper.Block{}, err
}

block := v1.NewBlock(reader, v1.NewMetrics(nil))

buf := bytes.NewBuffer(nil)
if err := v1.TarGz(buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

tarReader := bytes.NewReader(buf.Bytes())

return bloomshipper.Block{
BlockRef: ref,
Data: &DummyReadSeekCloser{},
}
Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader},
}, nil
}

func Test_blockPlansForGaps(t *testing.T) {
Expand Down Expand Up @@ -612,7 +640,12 @@ func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error
}

for _, block := range meta.Blocks {
err := bloomClient.PutBlock(context.Background(), genBlock(block))
writtenBlock, err := genBlock(block)
if err != nil {
return err
}

err = bloomClient.PutBlock(context.Background(), writtenBlock)
if err != nil {
return err
}
Expand Down Expand Up @@ -826,6 +859,7 @@ func Test_deleteOutdatedMetas(t *testing.T) {
for _, tc := range []struct {
name string
originalMetas []bloomshipper.Meta
newMetas []bloomshipper.Meta
expectedUpToDateMetas []bloomshipper.Meta
}{
{
Expand All @@ -835,6 +869,8 @@ func Test_deleteOutdatedMetas(t *testing.T) {
name: "only up to date metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
newMetas: []bloomshipper.Meta{
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
Expand All @@ -846,13 +882,52 @@ func Test_deleteOutdatedMetas(t *testing.T) {
name: "outdated metas",
originalMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}),
genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}),
},
newMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
},
{
name: "new metas reuse blocks from outdated meta",
originalMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ // Outdated
genBlockRef(0, 5), // Reuse
genBlockRef(5, 10), // Delete
}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{ // Outdated
genBlockRef(10, 20), // Reuse
}),
genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ // Up to date
genBlockRef(20, 30),
}),
},
newMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{
genBlockRef(0, 5), // Reused block
}),
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(5, 7), // New block
genBlockRef(7, 10), // New block
genBlockRef(10, 20), // Reused block
}),
},
expectedUpToDateMetas: []bloomshipper.Meta{
genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{
genBlockRef(0, 5),
}),
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(5, 7),
genBlockRef(7, 10),
genBlockRef(10, 20),
}),
genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{
genBlockRef(20, 30),
}),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
logger := log.NewNopLogger()
Expand All @@ -867,9 +942,11 @@ func Test_deleteOutdatedMetas(t *testing.T) {
bloomClient, err := planner.bloomStore.Client(testDay.ModelTime())
require.NoError(t, err)

// Create original metas and blocks
// Create original/new metas and blocks
err = putMetas(bloomClient, tc.originalMetas)
require.NoError(t, err)
err = putMetas(bloomClient, tc.newMetas)
require.NoError(t, err)

// Get all metas
metas, err := planner.bloomStore.FetchMetas(
Expand All @@ -882,9 +959,9 @@ func Test_deleteOutdatedMetas(t *testing.T) {
)
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.originalMetas, metas)
require.ElementsMatch(t, append(tc.originalMetas, tc.newMetas...), metas)

upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning)
upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.newMetas, tc.originalMetas, phasePlanning)
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate)

Expand All @@ -900,6 +977,13 @@ func Test_deleteOutdatedMetas(t *testing.T) {
require.NoError(t, err)
removeLocFromMetasSources(metas)
require.ElementsMatch(t, tc.expectedUpToDateMetas, metas)

// Fetch all blocks from the metas
for _, meta := range metas {
blocks, err := planner.bloomStore.FetchBlocks(context.Background(), meta.Blocks)
require.NoError(t, err)
require.Len(t, blocks, len(meta.Blocks))
}
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (p *processor) processTasks(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasksForDay(ctx context.Context, tenant string, day config.DayTime, tasks []Task) error {
func (p *processor) processTasksForDay(ctx context.Context, _ string, _ config.DayTime, tasks []Task) error {
var duration time.Duration

blocksRefs := make([]bloomshipper.BlockRef, 0, len(tasks[0].blocks)*len(tasks))
Expand Down

0 comments on commit 784e7d5

Please sign in to comment.