From 4a0641b9a2e1b35c8c2a62b23d4311b0d35a632d Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 15 Feb 2019 23:43:19 +0000 Subject: [PATCH] storage/bulk: split and scatter during ingest When many processes are concurrently bulk-ingesting without explicit pre-splitting, it is easy for them to all suddenly overwhelm a given range with SSTs, before it gets a chance update its stats and start to split. True pre-splitting is sometimes difficult -- absent a pre-read to smaple the data to ingest or an external source of that information (like a BACKUP descriptor), until we actually read the data and are ready to ingest it, we don't know where to split. However, if we produce an SST large enough that we opt to flush it based on its size, given that we usually chunk at known range boundaries, we can reasonably assume it will make a decently full range. Thus, it probably would be helpful to split off, and scatter, a new range for it before we send it out. As the target key-space becomes more split, the range-aware chunking will mean we start flushing smaller SSTs. We have no idea when sending a small SST if it is going to cause the target range to be over-full or not, so it doesn't make sense to indescriminately pre-split it. But happily, we probably don't need to -- the fact that we chunked based on a known split suggests the target keyspace is already well enough split that we're at least routing work to different ranges, so we can let the ranges split themselves as they fill up from there on. Release note: none. --- pkg/ccl/storageccl/import.go | 3 ++- pkg/storage/bulk/sst_batcher.go | 34 ++++++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index eacbea1d2d99..a4a87af8a52a 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -129,7 +129,8 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo iters = append(iters, iter) } - batcher, err := bulk.MakeSSTBatcher(ctx, db, MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings())) + const presplit = false // RESTORE does its own split-and-scatter. + batcher, err := bulk.MakeSSTBatcher(ctx, db, MaxImportBatchSize(cArgs.EvalCtx.ClusterSettings()), presplit) if err != nil { return nil, err } diff --git a/pkg/storage/bulk/sst_batcher.go b/pkg/storage/bulk/sst_batcher.go index 30ffdaaec38a..d2f7874da53d 100644 --- a/pkg/storage/bulk/sst_batcher.go +++ b/pkg/storage/bulk/sst_batcher.go @@ -41,7 +41,9 @@ type FixedTimestampSSTBatcher struct { func MakeFixedTimestampSSTBatcher( db *client.DB, rangeCache *kv.RangeDescriptorCache, flushBytes int64, timestamp hlc.Timestamp, ) (*FixedTimestampSSTBatcher, error) { - b := &FixedTimestampSSTBatcher{timestamp, SSTBatcher{db: db, maxSize: flushBytes, rc: rangeCache}} + b := &FixedTimestampSSTBatcher{ + timestamp, SSTBatcher{db: db, maxSize: flushBytes, rc: rangeCache, presplitOnFullFlush: true}, + } err := b.Reset() return b, err } @@ -66,6 +68,9 @@ type SSTBatcher struct { rc *kv.RangeDescriptorCache maxSize int64 + // split+scatter at start key before adding a "full" (i.e. >= maxSize) SST. + presplitOnFullFlush bool + // rows written in the current batch. rowCounter RowCounter totalRows roachpb.BulkOpSummary @@ -76,8 +81,10 @@ type SSTBatcher struct { } // MakeSSTBatcher makes a ready-to-use SSTBatcher. -func MakeSSTBatcher(ctx context.Context, db *client.DB, flushBytes int64) (*SSTBatcher, error) { - b := &SSTBatcher{db: db, maxSize: flushBytes} +func MakeSSTBatcher( + ctx context.Context, db *client.DB, flushBytes int64, presplit bool, +) (*SSTBatcher, error) { + b := &SSTBatcher{db: db, maxSize: flushBytes, presplitOnFullFlush: presplit} err := b.Reset() return b, err } @@ -168,6 +175,27 @@ func (b *SSTBatcher) Flush(ctx context.Context) error { // currently the largest key in the batch. Increment it. end := roachpb.Key(append([]byte(nil), b.batchEndKey...)).Next() + if b.presplitOnFullFlush && b.sstWriter.DataSize > b.maxSize { + log.VEventf(ctx, 1, "preparing to ingest %db SST by splitting at key %s", + b.sstWriter.DataSize, roachpb.PrettyPrintKey(nil, start)) + if err := b.db.AdminSplit(ctx, start, start); err != nil { + return err + } + + log.VEventf(ctx, 1, "scattering key %s", roachpb.PrettyPrintKey(nil, start)) + scatterReq := &roachpb.AdminScatterRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(roachpb.Span{Key: start, EndKey: start.Next()}), + } + if _, pErr := client.SendWrapped(ctx, b.db.NonTransactionalSender(), scatterReq); pErr != nil { + // TODO(dan): Unfortunately, Scatter is still too unreliable to + // fail the IMPORT when Scatter fails. I'm uncomfortable that + // this could break entirely and not start failing the tests, + // but on the bright side, it doesn't affect correctness, only + // throughput. + log.Errorf(ctx, "failed to scatter span %s: %s", roachpb.PrettyPrintKey(nil, start), pErr) + } + } + sstBytes, err := b.sstWriter.Finish() if err != nil { return errors.Wrapf(err, "finishing constructed sstable")