Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/sql/distsqlrun/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type chunkBackfiller interface {
chunkSize int64,
readAsOf hlc.Timestamp,
) (roachpb.Key, error)

// flush writes any pending backfilled data into SSTs and flushes them.
flush(ctx context.Context, readAsOf hlc.Timestamp) error
}

// backfiller is a processor that implements a distributed backfill of
Expand Down Expand Up @@ -133,6 +136,11 @@ func (b *backfiller) mainLoop(ctx context.Context) error {
break
}
}

if err := b.flush(ctx, b.spec.ReadAsOf); err != nil {
return err
}

log.VEventf(ctx, 2, "processed %d rows in %d chunks", row, nChunks)
return WriteResumeSpan(ctx,
b.flowCtx.ClientDB,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/distsqlrun/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,8 @@ func (cb *columnBackfiller) runChunk(
})
return key, err
}

// flush implements the chunkBackfiller interface.
func (cb *columnBackfiller) flush(ctx context.Context, readAsOf hlc.Timestamp) error {
return nil
}
105 changes: 57 additions & 48 deletions pkg/sql/distsqlrun/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type indexBackfiller struct {
backfill.IndexBackfiller

desc *sqlbase.ImmutableTableDescriptor

// pending index entries that haven't yet been flushed.
entries []sqlbase.IndexEntry
}

var _ Processor = &indexBackfiller{}
Expand Down Expand Up @@ -112,7 +115,6 @@ func (ib *indexBackfiller) runChunk(
}
*/

start := timeutil.Now()
var entries []sqlbase.IndexEntry
if err := ib.flowCtx.ClientDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, readAsOf)
Expand All @@ -124,56 +126,10 @@ func (ib *indexBackfiller) runChunk(
}); err != nil {
return nil, err
}
prepTime := timeutil.Now().Sub(start)

enabled := backfill.BulkWriteIndex.Get(&ib.flowCtx.Settings.SV)
if enabled {
start := timeutil.Now()
sort.Slice(entries, func(i, j int) bool {
return entries[i].Key.Compare(entries[j].Key) < 0
})
sortTime := timeutil.Now().Sub(start)

start = timeutil.Now()
adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, readAsOf)
if err != nil {
return nil, err
}
defer adder.Close()
containsInvertedIndex := ib.ContainsInvertedIndex()
for i := range entries {
if err := adder.Add(ctx, entries[i].Key, entries[i].Value.RawBytes); err != nil {
// Detect a duplicate within the SST being constructed. This is an
// insufficient but useful method for unique constraint enforcement
// and the index has to be validated after construction.
if i > 0 && entries[i-1].Key.Equal(entries[i].Key) {
if containsInvertedIndex {
// Depend on post index backfill validation to catch any errors.
continue
}
desc, err := ib.desc.MakeFirstMutationPublic()
immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc())
if err != nil {
return nil, err
}
entry := entries[i]
return nil, row.NewUniquenessConstraintViolationError(
ctx, immutable, entry.Key, &entry.Value)
}
return nil, err
}
}
addTime := timeutil.Now().Sub(start)

if err := adder.Flush(ctx); err != nil {
return nil, err
}

// Don't log perf stats in tests with small indexes.
if len(entries) > 1000 {
log.Infof(ctx, "index backfill stats: entries %d, prepare %+v, sort %+v, add-sst %+v",
len(entries), prepTime, sortTime, addTime)
}
ib.entries = append(ib.entries, entries...)
return key, nil
}
retried := false
Expand Down Expand Up @@ -224,3 +180,56 @@ func (ib *indexBackfiller) runChunk(

return key, nil
}

func (ib *indexBackfiller) flush(ctx context.Context, readAsOf hlc.Timestamp) error {
if len(ib.entries) > 0 {
entries := ib.entries
ib.entries = nil
start := timeutil.Now()
sort.Slice(entries, func(i, j int) bool {
return entries[i].Key.Compare(entries[j].Key) < 0
})
sortTime := timeutil.Now().Sub(start)

start = timeutil.Now()
adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, readAsOf)
if err != nil {
return err
}
defer adder.Close()
containsInvertedIndex := ib.ContainsInvertedIndex()
for i := range entries {
if err := adder.Add(ctx, entries[i].Key, entries[i].Value.RawBytes); err != nil {
// Detect a duplicate within the SST being constructed. This is an
// insufficient but useful method for unique constraint enforcement
// and the index has to be validated after construction.
if i > 0 && entries[i-1].Key.Equal(entries[i].Key) {
if containsInvertedIndex {
// Depend on post index backfill validation to catch any errors.
continue
}
desc, err := ib.desc.MakeFirstMutationPublic()
immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc())
if err != nil {
return err
}
entry := entries[i]
return row.NewUniquenessConstraintViolationError(
ctx, immutable, entry.Key, &entry.Value)
}
return err
}
}
addTime := timeutil.Now().Sub(start)

start = timeutil.Now()
if err := adder.Flush(ctx); err != nil {
return err
}
flushTime := timeutil.Now().Sub(start)

log.Infof(ctx, "index backfill stats: entries %d, sort %+v, add-sst %+v, flush %+v",
len(entries), sortTime, addTime, flushTime)
}
return nil
}