Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
iters = append(iters, iter)
}

batcher, err := bulk.MakeSSTBatcher(ctx, db, MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()))
const presplit = false // RESTORE does its own split-and-scatter.
batcher, err := bulk.MakeSSTBatcher(ctx, db, MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()), presplit)
if err != nil {
return nil, err
}
Expand Down
34 changes: 31 additions & 3 deletions pkg/storage/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type FixedTimestampSSTBatcher struct {
func MakeFixedTimestampSSTBatcher(
db *client.DB, rangeCache *kv.RangeDescriptorCache, flushBytes int64, timestamp hlc.Timestamp,
) (*FixedTimestampSSTBatcher, error) {
b := &FixedTimestampSSTBatcher{timestamp, SSTBatcher{db: db, maxSize: flushBytes, rc: rangeCache}}
b := &FixedTimestampSSTBatcher{
timestamp, SSTBatcher{db: db, maxSize: flushBytes, rc: rangeCache, presplitOnFullFlush: true},
}
err := b.Reset()
return b, err
}
Expand All @@ -66,6 +68,9 @@ type SSTBatcher struct {
rc *kv.RangeDescriptorCache

maxSize int64
// split+scatter at start key before adding a "full" (i.e. >= maxSize) SST.
presplitOnFullFlush bool

// rows written in the current batch.
rowCounter RowCounter
totalRows roachpb.BulkOpSummary
Expand All @@ -76,8 +81,10 @@ type SSTBatcher struct {
}

// MakeSSTBatcher makes a ready-to-use SSTBatcher.
func MakeSSTBatcher(ctx context.Context, db *client.DB, flushBytes int64) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, maxSize: flushBytes}
func MakeSSTBatcher(
ctx context.Context, db *client.DB, flushBytes int64, presplit bool,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, maxSize: flushBytes, presplitOnFullFlush: presplit}
err := b.Reset()
return b, err
}
Expand Down Expand Up @@ -168,6 +175,27 @@ func (b *SSTBatcher) Flush(ctx context.Context) error {
// currently the largest key in the batch. Increment it.
end := roachpb.Key(append([]byte(nil), b.batchEndKey...)).Next()

if b.presplitOnFullFlush && b.sstWriter.DataSize > b.maxSize {
log.VEventf(ctx, 1, "preparing to ingest %db SST by splitting at key %s",
b.sstWriter.DataSize, roachpb.PrettyPrintKey(nil, start))
if err := b.db.AdminSplit(ctx, start, start); err != nil {
return err
}

log.VEventf(ctx, 1, "scattering key %s", roachpb.PrettyPrintKey(nil, start))
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{Key: start, EndKey: start.Next()}),
}
if _, pErr := client.SendWrapped(ctx, b.db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the IMPORT when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
// but on the bright side, it doesn't affect correctness, only
// throughput.
log.Errorf(ctx, "failed to scatter span %s: %s", roachpb.PrettyPrintKey(nil, start), pErr)
}
}

sstBytes, err := b.sstWriter.Finish()
if err != nil {
return errors.Wrapf(err, "finishing constructed sstable")
Expand Down