diff --git a/pkg/sql/distsqlrun/backfiller.go b/pkg/sql/distsqlrun/backfiller.go index f52ea6494e4b..a1dd1638b61b 100644 --- a/pkg/sql/distsqlrun/backfiller.go +++ b/pkg/sql/distsqlrun/backfiller.go @@ -43,6 +43,9 @@ type chunkBackfiller interface { chunkSize int64, readAsOf hlc.Timestamp, ) (roachpb.Key, error) + + // flush writes any pending backfilled data into SSTs and flushes them. + flush(ctx context.Context, readAsOf hlc.Timestamp) error } // backfiller is a processor that implements a distributed backfill of @@ -133,6 +136,11 @@ func (b *backfiller) mainLoop(ctx context.Context) error { break } } + + if err := b.flush(ctx, b.spec.ReadAsOf); err != nil { + return err + } + log.VEventf(ctx, 2, "processed %d rows in %d chunks", row, nChunks) return WriteResumeSpan(ctx, b.flowCtx.ClientDB, diff --git a/pkg/sql/distsqlrun/columnbackfiller.go b/pkg/sql/distsqlrun/columnbackfiller.go index f8213726cc95..3575d83429fc 100644 --- a/pkg/sql/distsqlrun/columnbackfiller.go +++ b/pkg/sql/distsqlrun/columnbackfiller.go @@ -105,3 +105,8 @@ func (cb *columnBackfiller) runChunk( }) return key, err } + +// flush implements the chunkBackfiller interface. +func (cb *columnBackfiller) flush(ctx context.Context, readAsOf hlc.Timestamp) error { + return nil +} diff --git a/pkg/sql/distsqlrun/indexbackfiller.go b/pkg/sql/distsqlrun/indexbackfiller.go index 3c7b05c71d21..08f2850c0d3b 100644 --- a/pkg/sql/distsqlrun/indexbackfiller.go +++ b/pkg/sql/distsqlrun/indexbackfiller.go @@ -38,6 +38,9 @@ type indexBackfiller struct { backfill.IndexBackfiller desc *sqlbase.ImmutableTableDescriptor + + // pending index entries that haven't yet been flushed. + entries []sqlbase.IndexEntry } var _ Processor = &indexBackfiller{} @@ -112,7 +115,6 @@ func (ib *indexBackfiller) runChunk( } */ - start := timeutil.Now() var entries []sqlbase.IndexEntry if err := ib.flowCtx.ClientDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { txn.SetFixedTimestamp(ctx, readAsOf) @@ -124,56 +126,10 @@ func (ib *indexBackfiller) runChunk( }); err != nil { return nil, err } - prepTime := timeutil.Now().Sub(start) enabled := backfill.BulkWriteIndex.Get(&ib.flowCtx.Settings.SV) if enabled { - start := timeutil.Now() - sort.Slice(entries, func(i, j int) bool { - return entries[i].Key.Compare(entries[j].Key) < 0 - }) - sortTime := timeutil.Now().Sub(start) - - start = timeutil.Now() - adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, readAsOf) - if err != nil { - return nil, err - } - defer adder.Close() - containsInvertedIndex := ib.ContainsInvertedIndex() - for i := range entries { - if err := adder.Add(ctx, entries[i].Key, entries[i].Value.RawBytes); err != nil { - // Detect a duplicate within the SST being constructed. This is an - // insufficient but useful method for unique constraint enforcement - // and the index has to be validated after construction. - if i > 0 && entries[i-1].Key.Equal(entries[i].Key) { - if containsInvertedIndex { - // Depend on post index backfill validation to catch any errors. - continue - } - desc, err := ib.desc.MakeFirstMutationPublic() - immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()) - if err != nil { - return nil, err - } - entry := entries[i] - return nil, row.NewUniquenessConstraintViolationError( - ctx, immutable, entry.Key, &entry.Value) - } - return nil, err - } - } - addTime := timeutil.Now().Sub(start) - - if err := adder.Flush(ctx); err != nil { - return nil, err - } - - // Don't log perf stats in tests with small indexes. - if len(entries) > 1000 { - log.Infof(ctx, "index backfill stats: entries %d, prepare %+v, sort %+v, add-sst %+v", - len(entries), prepTime, sortTime, addTime) - } + ib.entries = append(ib.entries, entries...) return key, nil } retried := false @@ -224,3 +180,56 @@ func (ib *indexBackfiller) runChunk( return key, nil } + +func (ib *indexBackfiller) flush(ctx context.Context, readAsOf hlc.Timestamp) error { + if len(ib.entries) > 0 { + entries := ib.entries + ib.entries = nil + start := timeutil.Now() + sort.Slice(entries, func(i, j int) bool { + return entries[i].Key.Compare(entries[j].Key) < 0 + }) + sortTime := timeutil.Now().Sub(start) + + start = timeutil.Now() + adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, readAsOf) + if err != nil { + return err + } + defer adder.Close() + containsInvertedIndex := ib.ContainsInvertedIndex() + for i := range entries { + if err := adder.Add(ctx, entries[i].Key, entries[i].Value.RawBytes); err != nil { + // Detect a duplicate within the SST being constructed. This is an + // insufficient but useful method for unique constraint enforcement + // and the index has to be validated after construction. + if i > 0 && entries[i-1].Key.Equal(entries[i].Key) { + if containsInvertedIndex { + // Depend on post index backfill validation to catch any errors. + continue + } + desc, err := ib.desc.MakeFirstMutationPublic() + immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()) + if err != nil { + return err + } + entry := entries[i] + return row.NewUniquenessConstraintViolationError( + ctx, immutable, entry.Key, &entry.Value) + } + return err + } + } + addTime := timeutil.Now().Sub(start) + + start = timeutil.Now() + if err := adder.Flush(ctx); err != nil { + return err + } + flushTime := timeutil.Now().Sub(start) + + log.Infof(ctx, "index backfill stats: entries %d, sort %+v, add-sst %+v, flush %+v", + len(entries), sortTime, addTime, flushTime) + } + return nil +}