From 450bbce938fd548715104f6a1a4dde76e2e7ff34 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 29 Jul 2024 11:11:54 +0200 Subject: [PATCH] fix(blooms): Ship chunkrefs in task payload (#13677) --- pkg/bloombuild/builder/builder.go | 12 +- pkg/bloombuild/common/tsdb.go | 24 +- pkg/bloombuild/planner/planner.go | 99 ++++-- pkg/bloombuild/planner/planner_test.go | 126 +++++-- pkg/bloombuild/protos/compat.go | 43 ++- pkg/bloombuild/protos/types.pb.go | 448 ++++++++++++++++++++++--- pkg/bloombuild/protos/types.proto | 9 +- 7 files changed, 644 insertions(+), 117 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 5f2c3b5ab5fd..e10e5af3012a 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -335,7 +335,7 @@ func (b *Builder) processTask( // Fetch blocks that aren't up to date but are in the desired fingerprint range // to try and accelerate bloom creation. level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks)) - seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap) + seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap) if err != nil { level.Error(logger).Log("msg", "failed to get series and blocks", "err", err) return nil, fmt.Errorf("failed to get series and blocks: %w", err) @@ -454,15 +454,9 @@ func (b *Builder) processTask( func (b *Builder) loadWorkForGap( ctx context.Context, table config.DayTable, - tenant string, - id tsdb.Identifier, - gap protos.GapWithBlocks, + gap protos.Gap, ) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) { - // load a series iterator for the gap - seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to load tsdb") - } + seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series)) // load a blocks iterator for the gap fetcher, err := b.bloomStore.Fetcher(table.ModelTime()) diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go index 8082a8b319a4..a2e22529523b 100644 --- a/pkg/bloombuild/common/tsdb.go +++ b/pkg/bloombuild/common/tsdb.go @@ -9,7 +9,6 @@ import ( "strings" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -30,6 +29,11 @@ const ( gzipExtension = ".gz" ) +type ClosableForSeries interface { + sharding.ForSeries + Close() error +} + type TSDBStore interface { UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) @@ -38,8 +42,7 @@ type TSDBStore interface { table config.DayTable, tenant string, id tsdb.Identifier, - bounds v1.FingerprintBounds, - ) (iter.Iterator[*v1.Series], error) + ) (ClosableForSeries, error) } // BloomTSDBStore is a wrapper around the storage.Client interface which @@ -90,8 +93,7 @@ func (b *BloomTSDBStore) LoadTSDB( table config.DayTable, tenant string, id tsdb.Identifier, - bounds v1.FingerprintBounds, -) (iter.Iterator[*v1.Series], error) { +) (ClosableForSeries, error) { withCompression := id.Name() + gzipExtension data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression) @@ -118,13 +120,8 @@ func (b *BloomTSDBStore) LoadTSDB( } idx := tsdb.NewTSDBIndex(reader) - defer func() { - if err := idx.Close(); err != nil { - level.Error(b.logger).Log("msg", "failed to close index", "err", err) - } - }() - return NewTSDBSeriesIter(ctx, tenant, idx, bounds) + return idx, nil } func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) { @@ -251,12 +248,11 @@ func (s *TSDBStores) LoadTSDB( table config.DayTable, tenant string, id tsdb.Identifier, - bounds v1.FingerprintBounds, -) (iter.Iterator[*v1.Series], error) { +) (ClosableForSeries, error) { store, err := s.storeForPeriod(table.DayTime) if err != nil { return nil, err } - return store.LoadTSDB(ctx, table, tenant, id, bounds) + return store.LoadTSDB(ctx, table, tenant, id) } diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 285795a8327d..f65fdf59c9ac 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -365,6 +365,29 @@ func (p *Planner) computeTasks( return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err) } + // Resolve TSDBs + tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant) + if err != nil { + level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err) + return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err) + } + + if len(tsdbs) == 0 { + return nil, metas, nil + } + + openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs) + if err != nil { + return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err) + } + defer func() { + for idx, reader := range openTSDBs { + if err := reader.Close(); err != nil { + level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name()) + } + } + }() + for _, ownershipRange := range ownershipRanges { logger := log.With(logger, "ownership", ownershipRange.String()) @@ -372,7 +395,7 @@ func (p *Planner) computeTasks( metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange) // Find gaps in the TSDBs for this tenant/table - gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger) + gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger) if err != nil { level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err) continue @@ -453,6 +476,26 @@ func (p *Planner) processTenantTaskResults( return tasksSucceed, nil } +func openAllTSDBs( + ctx context.Context, + table config.DayTable, + tenant string, + store common.TSDBStore, + tsdbs []tsdb.SingleTenantTSDBIdentifier, +) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) { + openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs)) + for _, idx := range tsdbs { + tsdb, err := store.LoadTSDB(ctx, table, tenant, idx) + if err != nil { + return nil, fmt.Errorf("failed to load tsdb: %w", err) + } + + openTSDBs[idx] = tsdb + } + + return openTSDBs, nil +} + // deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store. // It returns the up-to-date metas from the `metas` argument. func (p *Planner) deleteOutdatedMetasAndBlocks( @@ -655,28 +698,17 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli // This is a performance optimization to avoid expensive re-reindexing type blockPlan struct { tsdb tsdb.SingleTenantTSDBIdentifier - gaps []protos.GapWithBlocks + gaps []protos.Gap } func (p *Planner) findOutdatedGaps( ctx context.Context, tenant string, - table config.DayTable, + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, ownershipRange v1.FingerprintBounds, metas []bloomshipper.Meta, logger log.Logger, ) ([]blockPlan, error) { - // Resolve TSDBs - tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant) - if err != nil { - level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err) - return nil, fmt.Errorf("failed to resolve tsdbs: %w", err) - } - - if len(tsdbs) == 0 { - return nil, nil - } - // Determine which TSDBs have gaps in the ownership range and need to // be processed. tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas) @@ -690,7 +722,7 @@ func (p *Planner) findOutdatedGaps( return nil, nil } - work, err := blockPlansForGaps(tsdbsWithGaps, metas) + work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas) if err != nil { level.Error(logger).Log("msg", "failed to create plan", "err", err) return nil, fmt.Errorf("failed to create plan: %w", err) @@ -701,18 +733,19 @@ func (p *Planner) findOutdatedGaps( // Used to signal the gaps that need to be populated for a tsdb type tsdbGaps struct { - tsdb tsdb.SingleTenantTSDBIdentifier - gaps []v1.FingerprintBounds + tsdbIdentifier tsdb.SingleTenantTSDBIdentifier + tsdb common.ClosableForSeries + gaps []v1.FingerprintBounds } // gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting // that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB. func gapsBetweenTSDBsAndMetas( ownershipRange v1.FingerprintBounds, - tsdbs []tsdb.SingleTenantTSDBIdentifier, + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, metas []bloomshipper.Meta, ) (res []tsdbGaps, err error) { - for _, db := range tsdbs { + for db, tsdb := range tsdbs { id := db.Name() relevantMetas := make([]v1.FingerprintBounds, 0, len(metas)) @@ -731,8 +764,9 @@ func gapsBetweenTSDBsAndMetas( if len(gaps) > 0 { res = append(res, tsdbGaps{ - tsdb: db, - gaps: gaps, + tsdbIdentifier: db, + tsdb: tsdb, + gaps: gaps, }) } } @@ -743,22 +777,35 @@ func gapsBetweenTSDBsAndMetas( // blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks. // This allows us to expedite bloom generation by using existing blocks to fill in the gaps // since many will contain the same chunks. -func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) { +func blockPlansForGaps( + ctx context.Context, + tenant string, + tsdbs []tsdbGaps, + metas []bloomshipper.Meta, +) ([]blockPlan, error) { plans := make([]blockPlan, 0, len(tsdbs)) for _, idx := range tsdbs { plan := blockPlan{ - tsdb: idx.tsdb, - gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)), + tsdb: idx.tsdbIdentifier, + gaps: make([]protos.Gap, 0, len(idx.gaps)), } for _, gap := range idx.gaps { - planGap := protos.GapWithBlocks{ + planGap := protos.Gap{ Bounds: gap, } - for _, meta := range metas { + seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap) + if err != nil { + return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err) + } + planGap.Series, err = iter.Collect(seriesItr) + if err != nil { + return nil, fmt.Errorf("failed to collect series: %w", err) + } + for _, meta := range metas { if meta.Bounds.Intersection(gap) == nil { // this meta doesn't overlap the gap, skip continue diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index ca5c1d0c15b0..88e45c725917 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -16,10 +16,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" + "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/chunkenc" iter "github.com/grafana/loki/v3/pkg/iter/v2" @@ -31,6 +33,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util/mempool" ) @@ -68,14 +71,16 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { err bool exp []tsdbGaps ownershipRange v1.FingerprintBounds - tsdbs []tsdb.SingleTenantTSDBIdentifier + tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries metas []bloomshipper.Meta }{ { desc: "non-overlapping tsdbs and metas", err: true, ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + }, metas: []bloomshipper.Meta{ genMeta(11, 20, []int{0}, nil), }, @@ -83,13 +88,15 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { { desc: "single tsdb", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)}, + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + }, metas: []bloomshipper.Meta{ genMeta(4, 8, []int{0}, nil), }, exp: []tsdbGaps{ { - tsdb: tsdbID(0), + tsdbIdentifier: tsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(0, 3), v1.NewBounds(9, 10), @@ -100,20 +107,23 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { { desc: "multiple tsdbs with separate blocks", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + tsdbID(1): nil, + }, metas: []bloomshipper.Meta{ genMeta(0, 5, []int{0}, nil), genMeta(6, 10, []int{1}, nil), }, exp: []tsdbGaps{ { - tsdb: tsdbID(0), + tsdbIdentifier: tsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(6, 10), }, }, { - tsdb: tsdbID(1), + tsdbIdentifier: tsdbID(1), gaps: []v1.FingerprintBounds{ v1.NewBounds(0, 5), }, @@ -123,20 +133,23 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { { desc: "multiple tsdbs with the same blocks", ownershipRange: v1.NewBounds(0, 10), - tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)}, + tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{ + tsdbID(0): nil, + tsdbID(1): nil, + }, metas: []bloomshipper.Meta{ genMeta(0, 5, []int{0, 1}, nil), genMeta(6, 8, []int{1}, nil), }, exp: []tsdbGaps{ { - tsdb: tsdbID(0), + tsdbIdentifier: tsdbID(0), gaps: []v1.FingerprintBounds{ v1.NewBounds(6, 10), }, }, { - tsdb: tsdbID(1), + tsdbIdentifier: tsdbID(1), gaps: []v1.FingerprintBounds{ v1.NewBounds(9, 10), }, @@ -220,9 +233,10 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: tsdbID(0), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), + Series: genSeries(v1.NewBounds(0, 10)), }, }, }, @@ -238,9 +252,10 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: tsdbID(0), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), + Series: genSeries(v1.NewBounds(0, 10)), Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, }, }, @@ -261,9 +276,10 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: tsdbID(0), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 8), + Series: genSeries(v1.NewBounds(0, 8)), }, }, }, @@ -280,9 +296,10 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: tsdbID(0), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 8), + Series: genSeries(v1.NewBounds(0, 8)), Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, }, }, @@ -306,14 +323,16 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: tsdbID(0), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) { Bounds: v1.NewBounds(3, 5), + Series: genSeries(v1.NewBounds(3, 5)), Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, }, { Bounds: v1.NewBounds(9, 10), + Series: genSeries(v1.NewBounds(9, 10)), Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, }, }, @@ -321,9 +340,10 @@ func Test_blockPlansForGaps(t *testing.T) { // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) { tsdb: tsdbID(1), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 2), + Series: genSeries(v1.NewBounds(0, 2)), Blocks: []bloomshipper.BlockRef{ genBlockRef(0, 1), genBlockRef(1, 2), @@ -331,6 +351,7 @@ func Test_blockPlansForGaps(t *testing.T) { }, { Bounds: v1.NewBounds(6, 7), + Series: genSeries(v1.NewBounds(6, 7)), Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, }, }, @@ -354,9 +375,10 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: tsdbID(0), - gaps: []protos.GapWithBlocks{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), + Series: genSeries(v1.NewBounds(0, 10)), Blocks: []bloomshipper.BlockRef{ genBlockRef(1, 4), genBlockRef(5, 10), @@ -369,20 +391,86 @@ func Test_blockPlansForGaps(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { + // We add series spanning the whole FP ownership range + tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries) + for _, id := range tc.tsdbs { + tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange)) + } + // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested // separately and it's used to generate input in our regular code path (easier to write tests this way). - gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas) + gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas) require.NoError(t, err) - plans, err := blockPlansForGaps(gaps, tc.metas) + plans, err := blockPlansForGaps( + context.Background(), + "fakeTenant", + gaps, + tc.metas, + ) if tc.err { require.Error(t, err) return } require.Equal(t, tc.exp, plans) + }) + } +} +func genSeries(bounds v1.FingerprintBounds) []*v1.Series { + series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)) + for i := bounds.Min; i <= bounds.Max; i++ { + series = append(series, &v1.Series{ + Fingerprint: i, + Chunks: v1.ChunkRefs{ + { + From: 0, + Through: 1, + Checksum: 1, + }, + }, }) } + return series +} + +type fakeForSeries struct { + series []*v1.Series +} + +func newFakeForSeries(series []*v1.Series) *fakeForSeries { + return &fakeForSeries{ + series: series, + } +} + +func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error { + overlapping := make([]*v1.Series, 0, len(f.series)) + for _, s := range f.series { + if ff.Match(s.Fingerprint) { + overlapping = append(overlapping, s) + } + } + + for _, s := range overlapping { + chunks := make([]index.ChunkMeta, 0, len(s.Chunks)) + for _, c := range s.Chunks { + chunks = append(chunks, index.ChunkMeta{ + MinTime: int64(c.From), + MaxTime: int64(c.Through), + Checksum: c.Checksum, + }) + } + + if fn(labels.EmptyLabels(), s.Fingerprint, chunks) { + break + } + } + return nil +} + +func (f fakeForSeries) Close() error { + return nil } func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { diff --git a/pkg/bloombuild/protos/compat.go b/pkg/bloombuild/protos/compat.go index ad7c492cc5fc..468278e77dbe 100644 --- a/pkg/bloombuild/protos/compat.go +++ b/pkg/bloombuild/protos/compat.go @@ -7,14 +7,16 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/grafana/loki/v3/pkg/logproto" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" ) -type GapWithBlocks struct { +type Gap struct { Bounds v1.FingerprintBounds + Series []*v1.Series Blocks []bloomshipper.BlockRef } @@ -25,7 +27,7 @@ type Task struct { Tenant string OwnershipBounds v1.FingerprintBounds TSDB tsdb.SingleTenantTSDBIdentifier - Gaps []GapWithBlocks + Gaps []Gap } func NewTask( @@ -33,10 +35,10 @@ func NewTask( tenant string, bounds v1.FingerprintBounds, tsdb tsdb.SingleTenantTSDBIdentifier, - gaps []GapWithBlocks, + gaps []Gap, ) *Task { return &Task{ - ID: fmt.Sprintf("%s-%s-%s-%d-%d", table.Addr(), tenant, bounds.String(), tsdb.Checksum, len(gaps)), + ID: fmt.Sprintf("%s-%s-%s-%d", table.Addr(), tenant, bounds.String(), len(gaps)), Table: table, Tenant: tenant, @@ -56,12 +58,25 @@ func FromProtoTask(task *ProtoTask) (*Task, error) { return nil, fmt.Errorf("failed to parse tsdb path %s", task.Tsdb) } - gaps := make([]GapWithBlocks, 0, len(task.Gaps)) + gaps := make([]Gap, 0, len(task.Gaps)) for _, gap := range task.Gaps { bounds := v1.FingerprintBounds{ Min: gap.Bounds.Min, Max: gap.Bounds.Max, } + + series := make([]*v1.Series, 0, len(gap.Series)) + for _, s := range gap.Series { + chunks := make(v1.ChunkRefs, 0, len(s.Chunks)) + for _, c := range s.Chunks { + chunks = append(chunks, v1.ChunkRef(*c)) + } + series = append(series, &v1.Series{ + Fingerprint: model.Fingerprint(s.Fingerprint), + Chunks: chunks, + }) + } + blocks := make([]bloomshipper.BlockRef, 0, len(gap.BlockRef)) for _, block := range gap.BlockRef { b, err := bloomshipper.BlockRefFromKey(block) @@ -71,8 +86,9 @@ func FromProtoTask(task *ProtoTask) (*Task, error) { blocks = append(blocks, b) } - gaps = append(gaps, GapWithBlocks{ + gaps = append(gaps, Gap{ Bounds: bounds, + Series: series, Blocks: blocks, }) } @@ -102,11 +118,26 @@ func (t *Task) ToProtoTask() *ProtoTask { blockRefs = append(blockRefs, block.String()) } + series := make([]*ProtoSeries, 0, len(gap.Series)) + for _, s := range gap.Series { + chunks := make([]*logproto.ShortRef, 0, len(s.Chunks)) + for _, c := range s.Chunks { + chunk := logproto.ShortRef(c) + chunks = append(chunks, &chunk) + } + + series = append(series, &ProtoSeries{ + Fingerprint: uint64(s.Fingerprint), + Chunks: chunks, + }) + } + protoGaps = append(protoGaps, &ProtoGapWithBlocks{ Bounds: ProtoFingerprintBounds{ Min: gap.Bounds.Min, Max: gap.Bounds.Max, }, + Series: series, BlockRef: blockRefs, }) } diff --git a/pkg/bloombuild/protos/types.pb.go b/pkg/bloombuild/protos/types.pb.go index e528aa61e917..f355b6471116 100644 --- a/pkg/bloombuild/protos/types.pb.go +++ b/pkg/bloombuild/protos/types.pb.go @@ -7,6 +7,7 @@ import ( fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + logproto "github.com/grafana/loki/v3/pkg/logproto" github_com_prometheus_common_model "github.com/prometheus/common/model" io "io" math "math" @@ -131,15 +132,67 @@ func (m *DayTable) GetPrefix() string { return "" } +type ProtoSeries struct { + Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Chunks []*logproto.ShortRef `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks,omitempty"` +} + +func (m *ProtoSeries) Reset() { *m = ProtoSeries{} } +func (*ProtoSeries) ProtoMessage() {} +func (*ProtoSeries) Descriptor() ([]byte, []int) { + return fileDescriptor_5325fb0610e1e9ae, []int{2} +} +func (m *ProtoSeries) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ProtoSeries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ProtoSeries.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ProtoSeries) XXX_Merge(src proto.Message) { + xxx_messageInfo_ProtoSeries.Merge(m, src) +} +func (m *ProtoSeries) XXX_Size() int { + return m.Size() +} +func (m *ProtoSeries) XXX_DiscardUnknown() { + xxx_messageInfo_ProtoSeries.DiscardUnknown(m) +} + +var xxx_messageInfo_ProtoSeries proto.InternalMessageInfo + +func (m *ProtoSeries) GetFingerprint() uint64 { + if m != nil { + return m.Fingerprint + } + return 0 +} + +func (m *ProtoSeries) GetChunks() []*logproto.ShortRef { + if m != nil { + return m.Chunks + } + return nil +} + type ProtoGapWithBlocks struct { Bounds ProtoFingerprintBounds `protobuf:"bytes,1,opt,name=bounds,proto3" json:"bounds"` - BlockRef []string `protobuf:"bytes,2,rep,name=blockRef,proto3" json:"blockRef,omitempty"` + Series []*ProtoSeries `protobuf:"bytes,2,rep,name=series,proto3" json:"series,omitempty"` + BlockRef []string `protobuf:"bytes,3,rep,name=blockRef,proto3" json:"blockRef,omitempty"` } func (m *ProtoGapWithBlocks) Reset() { *m = ProtoGapWithBlocks{} } func (*ProtoGapWithBlocks) ProtoMessage() {} func (*ProtoGapWithBlocks) Descriptor() ([]byte, []int) { - return fileDescriptor_5325fb0610e1e9ae, []int{2} + return fileDescriptor_5325fb0610e1e9ae, []int{3} } func (m *ProtoGapWithBlocks) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -175,6 +228,13 @@ func (m *ProtoGapWithBlocks) GetBounds() ProtoFingerprintBounds { return ProtoFingerprintBounds{} } +func (m *ProtoGapWithBlocks) GetSeries() []*ProtoSeries { + if m != nil { + return m.Series + } + return nil +} + func (m *ProtoGapWithBlocks) GetBlockRef() []string { if m != nil { return m.BlockRef @@ -197,7 +257,7 @@ type ProtoTask struct { func (m *ProtoTask) Reset() { *m = ProtoTask{} } func (*ProtoTask) ProtoMessage() {} func (*ProtoTask) Descriptor() ([]byte, []int) { - return fileDescriptor_5325fb0610e1e9ae, []int{3} + return fileDescriptor_5325fb0610e1e9ae, []int{4} } func (m *ProtoTask) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -277,7 +337,7 @@ type ProtoMeta struct { func (m *ProtoMeta) Reset() { *m = ProtoMeta{} } func (*ProtoMeta) ProtoMessage() {} func (*ProtoMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_5325fb0610e1e9ae, []int{4} + return fileDescriptor_5325fb0610e1e9ae, []int{5} } func (m *ProtoMeta) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -336,7 +396,7 @@ type ProtoTaskResult struct { func (m *ProtoTaskResult) Reset() { *m = ProtoTaskResult{} } func (*ProtoTaskResult) ProtoMessage() {} func (*ProtoTaskResult) Descriptor() ([]byte, []int) { - return fileDescriptor_5325fb0610e1e9ae, []int{5} + return fileDescriptor_5325fb0610e1e9ae, []int{6} } func (m *ProtoTaskResult) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -389,6 +449,7 @@ func (m *ProtoTaskResult) GetCreatedMetas() []*ProtoMeta { func init() { proto.RegisterType((*ProtoFingerprintBounds)(nil), "protos.ProtoFingerprintBounds") proto.RegisterType((*DayTable)(nil), "protos.DayTable") + proto.RegisterType((*ProtoSeries)(nil), "protos.ProtoSeries") proto.RegisterType((*ProtoGapWithBlocks)(nil), "protos.ProtoGapWithBlocks") proto.RegisterType((*ProtoTask)(nil), "protos.ProtoTask") proto.RegisterType((*ProtoMeta)(nil), "protos.ProtoMeta") @@ -398,42 +459,47 @@ func init() { func init() { proto.RegisterFile("pkg/bloombuild/protos/types.proto", fileDescriptor_5325fb0610e1e9ae) } var fileDescriptor_5325fb0610e1e9ae = []byte{ - // 551 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0xb1, 0x6f, 0xd3, 0x4e, - 0x18, 0xb5, 0xe3, 0x34, 0xbf, 0xe6, 0xd2, 0x5f, 0x81, 0x53, 0x55, 0x59, 0x11, 0xba, 0x04, 0x0f, - 0x28, 0x93, 0x2d, 0x05, 0x75, 0x40, 0x62, 0xb2, 0xa2, 0x22, 0x40, 0x95, 0xd0, 0x35, 0x12, 0x12, - 0xdb, 0x39, 0xbe, 0x3a, 0x56, 0x6c, 0x9f, 0xe5, 0x3b, 0xa3, 0x64, 0xe3, 0x4f, 0xe0, 0xcf, 0x60, - 0xe6, 0xaf, 0xe8, 0x98, 0xb1, 0x53, 0x44, 0x9c, 0x05, 0x75, 0xea, 0xc4, 0xc0, 0x84, 0xee, 0xce, - 0x29, 0x09, 0x62, 0x82, 0xe9, 0xbe, 0xf7, 0xdd, 0x77, 0xef, 0x7b, 0xef, 0xc9, 0x06, 0x4f, 0xf2, - 0x59, 0xe4, 0x05, 0x09, 0x63, 0x69, 0x50, 0xc6, 0x49, 0xe8, 0xe5, 0x05, 0x13, 0x8c, 0x7b, 0x62, - 0x91, 0x53, 0xee, 0x2a, 0x00, 0x5b, 0xba, 0xd7, 0x3d, 0x89, 0x58, 0xc4, 0x54, 0xed, 0xc9, 0x4a, - 0xdf, 0x3a, 0x5f, 0x4c, 0x70, 0xfa, 0x56, 0x56, 0xe7, 0x71, 0x16, 0xd1, 0x22, 0x2f, 0xe2, 0x4c, - 0xf8, 0xac, 0xcc, 0x42, 0x0e, 0xdf, 0x00, 0x2b, 0x8d, 0x33, 0xdb, 0xec, 0x9b, 0x83, 0xa6, 0xff, - 0xfc, 0x76, 0xd5, 0x93, 0xf0, 0xc7, 0xaa, 0xe7, 0x46, 0xb1, 0x98, 0x96, 0x81, 0x3b, 0x61, 0xa9, - 0xdc, 0x97, 0x52, 0x31, 0xa5, 0x25, 0xf7, 0x26, 0x2c, 0x4d, 0x59, 0xe6, 0xa5, 0x2c, 0xa4, 0x89, - 0xbb, 0xc3, 0x86, 0xe5, 0x33, 0x45, 0x46, 0xe6, 0x76, 0x63, 0x87, 0x8c, 0xcc, 0xff, 0x8a, 0x8c, - 0xcc, 0x9d, 0xd7, 0xe0, 0x70, 0x44, 0x16, 0x63, 0x12, 0x24, 0x14, 0x3e, 0x05, 0xc7, 0x21, 0x59, - 0x8c, 0xe3, 0x94, 0x72, 0x41, 0xd2, 0xfc, 0xe2, 0x52, 0x09, 0xb6, 0xf0, 0x6f, 0x5d, 0x78, 0x0a, - 0x5a, 0x79, 0x41, 0xaf, 0x62, 0xad, 0xa1, 0x8d, 0x6b, 0xe4, 0xcc, 0x01, 0x54, 0xfe, 0x5f, 0x92, - 0xfc, 0x5d, 0x2c, 0xa6, 0x7e, 0xc2, 0x26, 0x33, 0x0e, 0xcf, 0x41, 0x2b, 0x50, 0x29, 0x28, 0xb6, - 0xce, 0x10, 0xe9, 0xb8, 0xb8, 0xfb, 0xe7, 0xac, 0xfc, 0xe3, 0xeb, 0x55, 0xcf, 0xb8, 0x5d, 0xf5, - 0xea, 0x57, 0xb8, 0x3e, 0x61, 0x17, 0x1c, 0x06, 0x92, 0x11, 0xd3, 0x2b, 0xbb, 0xd1, 0xb7, 0x06, - 0x6d, 0x7c, 0x8f, 0x9d, 0xef, 0x26, 0x68, 0x2b, 0xba, 0x31, 0xe1, 0x33, 0x78, 0x0c, 0x1a, 0x71, - 0xa8, 0xb6, 0xb5, 0x71, 0x23, 0x0e, 0xe1, 0x19, 0x38, 0x10, 0xd2, 0xa0, 0x92, 0xdb, 0x19, 0x3e, - 0xdc, 0x0a, 0xd8, 0x1a, 0xf7, 0xff, 0xaf, 0x57, 0xea, 0x31, 0xac, 0x0f, 0x69, 0x53, 0xd0, 0x8c, - 0x64, 0xc2, 0xb6, 0xb4, 0x4d, 0x8d, 0x76, 0x0c, 0x35, 0xff, 0xc9, 0x10, 0x04, 0x4d, 0xc1, 0xc3, - 0xc0, 0x3e, 0x50, 0xec, 0xaa, 0x86, 0x2e, 0x68, 0x46, 0x24, 0xe7, 0x76, 0xab, 0x6f, 0x0d, 0x3a, - 0xc3, 0xee, 0x1e, 0xf3, 0x5e, 0xac, 0x58, 0xcd, 0x39, 0x51, 0xed, 0xfb, 0x82, 0x0a, 0x02, 0x6d, - 0xf0, 0x5f, 0x4a, 0x05, 0x91, 0x01, 0x69, 0xf3, 0x5b, 0x08, 0x1d, 0x70, 0xc4, 0x59, 0x59, 0x4c, - 0x28, 0x1f, 0x5f, 0x8e, 0x7c, 0x5e, 0xe7, 0xb7, 0xd7, 0x83, 0x8f, 0x41, 0x7b, 0x9b, 0x27, 0xb7, - 0x2d, 0x35, 0xf0, 0xab, 0xe1, 0x7c, 0x00, 0x0f, 0xee, 0x03, 0xc6, 0x94, 0x97, 0x89, 0x50, 0xf9, - 0x10, 0x3e, 0x7b, 0x35, 0xaa, 0xb7, 0xd5, 0x08, 0x9e, 0x80, 0x03, 0x5a, 0x14, 0xac, 0xa8, 0xbf, - 0x0e, 0x0d, 0xe0, 0x19, 0x38, 0x9a, 0x14, 0x94, 0x08, 0x1a, 0x4a, 0xad, 0x7a, 0x43, 0x67, 0xf8, - 0x68, 0xcf, 0xa1, 0xbc, 0xc1, 0x7b, 0x63, 0xfe, 0x8b, 0xe5, 0x1a, 0x19, 0x37, 0x6b, 0x64, 0xdc, - 0xad, 0x91, 0xf9, 0xb1, 0x42, 0xe6, 0xe7, 0x0a, 0x99, 0xd7, 0x15, 0x32, 0x97, 0x15, 0x32, 0xbf, - 0x56, 0xc8, 0xfc, 0x56, 0x21, 0xe3, 0xae, 0x42, 0xe6, 0xa7, 0x0d, 0x32, 0x96, 0x1b, 0x64, 0xdc, - 0x6c, 0x90, 0xf1, 0xbe, 0xfe, 0x51, 0x03, 0x7d, 0x3e, 0xfb, 0x19, 0x00, 0x00, 0xff, 0xff, 0xdc, - 0x0e, 0x2e, 0xd1, 0xdc, 0x03, 0x00, 0x00, + // 630 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0x3f, 0x6f, 0xd3, 0x40, + 0x1c, 0xb5, 0x93, 0x34, 0x34, 0x97, 0x52, 0xe0, 0xa8, 0x2a, 0x2b, 0x42, 0x97, 0xe0, 0x01, 0x55, + 0x20, 0x39, 0x52, 0x50, 0x07, 0x24, 0x26, 0xab, 0x2a, 0x02, 0x54, 0x09, 0x5d, 0x22, 0x21, 0xc1, + 0x74, 0x8e, 0x2f, 0x8e, 0x15, 0xdb, 0x67, 0xf9, 0xce, 0xd0, 0x6c, 0x7c, 0x04, 0xbe, 0x04, 0x12, + 0x33, 0x9f, 0xa2, 0x63, 0xc7, 0x4e, 0x11, 0x75, 0x17, 0xd4, 0xa9, 0x13, 0x03, 0x13, 0xba, 0x3f, + 0x69, 0x13, 0xc4, 0x04, 0xd3, 0xdd, 0xfb, 0xdd, 0xef, 0xde, 0xef, 0xbd, 0x77, 0x96, 0xc1, 0xc3, + 0x7c, 0x16, 0xf5, 0x83, 0x84, 0xb1, 0x34, 0x28, 0xe3, 0x24, 0xec, 0xe7, 0x05, 0x13, 0x8c, 0xf7, + 0xc5, 0x3c, 0xa7, 0xdc, 0x53, 0x00, 0x36, 0x75, 0xad, 0xb3, 0x13, 0xb1, 0x88, 0xa9, 0x7d, 0x5f, + 0xee, 0xf4, 0x69, 0xa7, 0x2b, 0x09, 0x12, 0x16, 0xe9, 0x03, 0xc5, 0x14, 0x11, 0x41, 0x3f, 0x92, + 0xb9, 0x6e, 0x70, 0xbf, 0xd9, 0x60, 0xf7, 0x8d, 0xdc, 0x1d, 0xc6, 0x59, 0x44, 0x8b, 0xbc, 0x88, + 0x33, 0xe1, 0xb3, 0x32, 0x0b, 0x39, 0x7c, 0x0d, 0xea, 0x69, 0x9c, 0x39, 0x76, 0xcf, 0xde, 0x6b, + 0xf8, 0xcf, 0x2e, 0x17, 0x5d, 0x09, 0x7f, 0x2d, 0xba, 0x5e, 0x14, 0x8b, 0x69, 0x19, 0x78, 0x63, + 0x96, 0x4a, 0x41, 0x29, 0x15, 0x53, 0x5a, 0xf2, 0xfe, 0x98, 0xa5, 0x29, 0xcb, 0xfa, 0x29, 0x0b, + 0x69, 0xe2, 0xad, 0xb0, 0x61, 0x79, 0x4d, 0x91, 0x91, 0x63, 0xa7, 0xb6, 0x42, 0x46, 0x8e, 0xff, + 0x89, 0x8c, 0x1c, 0xbb, 0xaf, 0xc0, 0xe6, 0x01, 0x99, 0x8f, 0x48, 0x90, 0x50, 0xf8, 0x08, 0x6c, + 0x87, 0x64, 0x3e, 0x8a, 0x53, 0xca, 0x05, 0x49, 0xf3, 0xa3, 0xa1, 0x12, 0x5c, 0xc7, 0x7f, 0x54, + 0xe1, 0x2e, 0x68, 0xe6, 0x05, 0x9d, 0xc4, 0x5a, 0x43, 0x0b, 0x1b, 0xe4, 0xbe, 0x07, 0x6d, 0xe5, + 0x7f, 0x48, 0x8b, 0x98, 0x72, 0xd8, 0x03, 0xed, 0xc9, 0xcd, 0x38, 0x6d, 0x1e, 0xaf, 0x96, 0xe0, + 0x63, 0xd0, 0x1c, 0x4f, 0xcb, 0x6c, 0xc6, 0x9d, 0x5a, 0xaf, 0xbe, 0xd7, 0x1e, 0x40, 0x6f, 0x99, + 0xaf, 0x37, 0x9c, 0xb2, 0x42, 0x60, 0x3a, 0xc1, 0xa6, 0xc3, 0xfd, 0x62, 0x03, 0xa8, 0xd8, 0x5f, + 0x90, 0xfc, 0x6d, 0x2c, 0xa6, 0x7e, 0xc2, 0xc6, 0x33, 0x0e, 0x0f, 0x41, 0x33, 0x50, 0x19, 0x2b, + 0xfe, 0xf6, 0x00, 0xe9, 0xc7, 0xe0, 0xde, 0xdf, 0x5f, 0xc2, 0xdf, 0x3e, 0x59, 0x74, 0xad, 0xcb, + 0x45, 0xd7, 0xdc, 0xc2, 0x66, 0x85, 0x4f, 0x40, 0x93, 0x2b, 0xd9, 0x46, 0xca, 0xfd, 0x35, 0x1e, + 0xed, 0x08, 0x9b, 0x16, 0xd8, 0x01, 0x9b, 0x81, 0x1c, 0x8f, 0xe9, 0xc4, 0xa9, 0xf7, 0xea, 0x7b, + 0x2d, 0x7c, 0x8d, 0xdd, 0x9f, 0x36, 0x68, 0xa9, 0x3b, 0x23, 0xc2, 0x67, 0x70, 0x1b, 0xd4, 0xe2, + 0x50, 0x49, 0x6b, 0xe1, 0x5a, 0x1c, 0xc2, 0x7d, 0xb0, 0x21, 0x64, 0xd6, 0x2a, 0xb9, 0xf6, 0xe0, + 0xee, 0x72, 0xca, 0xf2, 0x0d, 0xfc, 0xdb, 0x46, 0x9f, 0x6e, 0xc3, 0x7a, 0x91, 0x89, 0x0b, 0x9a, + 0x91, 0x4c, 0x38, 0x75, 0x9d, 0xb8, 0x46, 0x2b, 0xee, 0x1b, 0xff, 0xe5, 0x1e, 0x82, 0x86, 0xe0, + 0x61, 0xe0, 0x6c, 0x28, 0x76, 0xb5, 0x87, 0x1e, 0x68, 0x44, 0x24, 0xe7, 0x4e, 0x53, 0xe5, 0xd1, + 0x59, 0x63, 0x5e, 0x7b, 0x03, 0xac, 0xfa, 0xdc, 0xc8, 0xf8, 0x3e, 0xa2, 0x82, 0x40, 0x07, 0xdc, + 0x4a, 0xa9, 0x20, 0x32, 0x20, 0x6d, 0x7e, 0x09, 0xa1, 0x0b, 0xb6, 0x38, 0x2b, 0x8b, 0x31, 0xe5, + 0xa3, 0xe1, 0x81, 0xaf, 0xe3, 0x6e, 0xe1, 0xb5, 0x1a, 0x7c, 0x00, 0x5a, 0xcb, 0x3c, 0xb9, 0x09, + 0xf8, 0xa6, 0xe0, 0x7e, 0x00, 0x77, 0xae, 0x03, 0xc6, 0x94, 0x97, 0x89, 0x50, 0xf9, 0x10, 0x3e, + 0x7b, 0x79, 0x60, 0xa6, 0x19, 0x04, 0x77, 0xc0, 0x06, 0x2d, 0x0a, 0x56, 0x98, 0x0f, 0x55, 0x03, + 0xb8, 0x0f, 0xb6, 0xc6, 0x05, 0x25, 0x82, 0x86, 0x52, 0xab, 0x9e, 0xd0, 0x1e, 0xdc, 0x5b, 0x73, + 0x28, 0x4f, 0xf0, 0x5a, 0x9b, 0xff, 0xfc, 0xf4, 0x1c, 0x59, 0x67, 0xe7, 0xc8, 0xba, 0x3a, 0x47, + 0xf6, 0xa7, 0x0a, 0xd9, 0x5f, 0x2b, 0x64, 0x9f, 0x54, 0xc8, 0x3e, 0xad, 0x90, 0xfd, 0xbd, 0x42, + 0xf6, 0x8f, 0x0a, 0x59, 0x57, 0x15, 0xb2, 0x3f, 0x5f, 0x20, 0xeb, 0xf4, 0x02, 0x59, 0x67, 0x17, + 0xc8, 0x7a, 0x67, 0x7e, 0x2a, 0x81, 0x5e, 0x9f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xf9, 0x38, + 0x02, 0xe1, 0x88, 0x04, 0x00, 0x00, } func (this *ProtoFingerprintBounds) Equal(that interface{}) bool { @@ -490,6 +556,38 @@ func (this *DayTable) Equal(that interface{}) bool { } return true } +func (this *ProtoSeries) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ProtoSeries) + if !ok { + that2, ok := that.(ProtoSeries) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Fingerprint != that1.Fingerprint { + return false + } + if len(this.Chunks) != len(that1.Chunks) { + return false + } + for i := range this.Chunks { + if !this.Chunks[i].Equal(that1.Chunks[i]) { + return false + } + } + return true +} func (this *ProtoGapWithBlocks) Equal(that interface{}) bool { if that == nil { return this == nil @@ -512,6 +610,14 @@ func (this *ProtoGapWithBlocks) Equal(that interface{}) bool { if !this.Bounds.Equal(&that1.Bounds) { return false } + if len(this.Series) != len(that1.Series) { + return false + } + for i := range this.Series { + if !this.Series[i].Equal(that1.Series[i]) { + return false + } + } if len(this.BlockRef) != len(that1.BlockRef) { return false } @@ -663,13 +769,29 @@ func (this *DayTable) GoString() string { s = append(s, "}") return strings.Join(s, "") } -func (this *ProtoGapWithBlocks) GoString() string { +func (this *ProtoSeries) GoString() string { if this == nil { return "nil" } s := make([]string, 0, 6) + s = append(s, "&protos.ProtoSeries{") + s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") + if this.Chunks != nil { + s = append(s, "Chunks: "+fmt.Sprintf("%#v", this.Chunks)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ProtoGapWithBlocks) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) s = append(s, "&protos.ProtoGapWithBlocks{") s = append(s, "Bounds: "+strings.Replace(this.Bounds.GoString(), `&`, ``, 1)+",\n") + if this.Series != nil { + s = append(s, "Series: "+fmt.Sprintf("%#v", this.Series)+",\n") + } s = append(s, "BlockRef: "+fmt.Sprintf("%#v", this.BlockRef)+",\n") s = append(s, "}") return strings.Join(s, "") @@ -793,6 +915,48 @@ func (m *DayTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *ProtoSeries) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ProtoSeries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ProtoSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Chunks) > 0 { + for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if m.Fingerprint != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Fingerprint)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *ProtoGapWithBlocks) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -819,6 +983,20 @@ func (m *ProtoGapWithBlocks) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], m.BlockRef[iNdEx]) i = encodeVarintTypes(dAtA, i, uint64(len(m.BlockRef[iNdEx]))) i-- + dAtA[i] = 0x1a + } + } + if len(m.Series) > 0 { + for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Series[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- dAtA[i] = 0x12 } } @@ -1054,6 +1232,24 @@ func (m *DayTable) Size() (n int) { return n } +func (m *ProtoSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Fingerprint != 0 { + n += 1 + sovTypes(uint64(m.Fingerprint)) + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + return n +} + func (m *ProtoGapWithBlocks) Size() (n int) { if m == nil { return 0 @@ -1062,6 +1258,12 @@ func (m *ProtoGapWithBlocks) Size() (n int) { _ = l l = m.Bounds.Size() n += 1 + l + sovTypes(uint64(l)) + if len(m.Series) > 0 { + for _, e := range m.Series { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } if len(m.BlockRef) > 0 { for _, s := range m.BlockRef { l = len(s) @@ -1178,12 +1380,34 @@ func (this *DayTable) String() string { }, "") return s } +func (this *ProtoSeries) String() string { + if this == nil { + return "nil" + } + repeatedStringForChunks := "[]*ShortRef{" + for _, f := range this.Chunks { + repeatedStringForChunks += strings.Replace(fmt.Sprintf("%v", f), "ShortRef", "logproto.ShortRef", 1) + "," + } + repeatedStringForChunks += "}" + s := strings.Join([]string{`&ProtoSeries{`, + `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, + `Chunks:` + repeatedStringForChunks + `,`, + `}`, + }, "") + return s +} func (this *ProtoGapWithBlocks) String() string { if this == nil { return "nil" } + repeatedStringForSeries := "[]*ProtoSeries{" + for _, f := range this.Series { + repeatedStringForSeries += strings.Replace(f.String(), "ProtoSeries", "ProtoSeries", 1) + "," + } + repeatedStringForSeries += "}" s := strings.Join([]string{`&ProtoGapWithBlocks{`, `Bounds:` + strings.Replace(strings.Replace(this.Bounds.String(), "ProtoFingerprintBounds", "ProtoFingerprintBounds", 1), `&`, ``, 1) + `,`, + `Series:` + repeatedStringForSeries + `,`, `BlockRef:` + fmt.Sprintf("%v", this.BlockRef) + `,`, `}`, }, "") @@ -1441,6 +1665,112 @@ func (m *DayTable) Unmarshal(dAtA []byte) error { } return nil } +func (m *ProtoSeries) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ProtoSeries: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ProtoSeries: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) + } + m.Fingerprint = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Fingerprint |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, &logproto.ShortRef{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ProtoGapWithBlocks) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1504,6 +1834,40 @@ func (m *ProtoGapWithBlocks) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Series = append(m.Series, &ProtoSeries{}) + if err := m.Series[len(m.Series)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field BlockRef", wireType) } diff --git a/pkg/bloombuild/protos/types.proto b/pkg/bloombuild/protos/types.proto index 55ae89625abe..9e63dd1adb60 100644 --- a/pkg/bloombuild/protos/types.proto +++ b/pkg/bloombuild/protos/types.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package protos; import "gogoproto/gogo.proto"; +import "pkg/logproto/bloomgateway.proto"; option go_package = "protos"; option (gogoproto.marshaler_all) = true; @@ -27,12 +28,18 @@ message DayTable { string prefix = 2; } +message ProtoSeries { + uint64 fingerprint = 1; + repeated logproto.ShortRef chunks = 2; +} + message ProtoGapWithBlocks { ProtoFingerprintBounds bounds = 1 [ (gogoproto.nullable) = false, (gogoproto.jsontag) = "bounds" ]; - repeated string blockRef = 2; + repeated ProtoSeries series = 2; + repeated string blockRef = 3; } // TODO: Define BlockRef and SingleTenantTSDBIdentifier as messages so we can use them right away