From f7cf5118cc7da501a4e1d1595df6072f5b805a9c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 26 Mar 2019 14:44:28 +0000 Subject: [PATCH 1/2] storage/bulk: push buffer-sort down to BulkAdder This pushes the buffering and sorting of out-of-order keys down to the BulkAdder. This pattern is common everywhere we produce out-of-order KVs that we want to ingest in bulk -- we need to sort them, both for the ordering property of SSTs but also for grouping. Evidence suggests that changes in buffering and splitting could lead to significant changes in overall ingest performance -- buffering more or even just buffering/splitting smarter affects the spans of produced SSTs which in turn directly affects how expensive overall ingestion is -- due to both some cockroach-specific factors like stats recomputation as well as other factors like compaction-derived write-amplification. While I'm here: Embedding `backfiller` in the `chunkbackfiller` implementations, and in turn embedding `chunkbackfiller` in `backfiller` created a loop, whereby a stuct with no methods could implement `chunkbackfiller` (leading to a NPE at runtime?). Release note: none. --- pkg/ccl/importccl/read_import_proc.go | 22 +-- pkg/server/server.go | 2 +- pkg/sql/distsqlrun/backfiller.go | 21 ++- pkg/sql/distsqlrun/columnbackfiller.go | 10 +- pkg/sql/distsqlrun/indexbackfiller.go | 87 ++++++----- pkg/sql/schema_changer_test.go | 4 + pkg/storage/batcheval/cmd_add_sstable.go | 2 +- pkg/storage/bulk/buffering_adder.go | 179 +++++++++++++++++++++++ pkg/storage/bulk/sst_batcher.go | 51 ++++--- pkg/storage/bulk/sst_batcher_test.go | 7 +- pkg/storage/storagebase/bulk_adder.go | 20 ++- 11 files changed, 317 insertions(+), 88 deletions(-) create mode 100644 pkg/storage/bulk/buffering_adder.go diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 11a0b2ede314..792224428206 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -9,14 +9,12 @@ package importccl import ( - "bytes" "compress/bzip2" "compress/gzip" "context" "io" "io/ioutil" "math/rand" - "sort" "strings" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -504,7 +502,7 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error { if err != nil { return err } - defer adder.Close() + defer adder.Close(ctx) // Drain the kvCh using the BulkAdder until it closes. if err := ingestKvs(ctx, adder, kvCh); err != nil { @@ -655,20 +653,15 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB if len(buf) == 0 { return nil } - sort.Sort(buf) for i := range buf { if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil { - if i > 0 && bytes.Equal(buf[i].Key, buf[i-1].Key) { - return pgerror.Wrapf(err, pgerror.CodeDataExceptionError, - errSSTCreationMaybeDuplicateTemplate, buf[i].Key) + if _, ok := err.(storagebase.DuplicateKeyError); ok { + return pgerror.Wrap(err, pgerror.CodeDataExceptionError, "") } return err } } - if err := adder.Flush(ctx); err != nil { - return err - } - return adder.Reset() + return nil } for kvBatch := range kvCh { @@ -702,6 +695,13 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB return err } } + + if err := adder.Flush(ctx); err != nil { + if err, ok := err.(storagebase.DuplicateKeyError); ok { + return pgerror.Wrap(err, pgerror.CodeDataExceptionError, "") + } + return err + } return nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 1be40d81b164..48fd13772cb8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -526,7 +526,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { TempStorage: tempEngine, BulkAdder: func(ctx context.Context, db *client.DB, size int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) { - return bulk.MakeFixedTimestampSSTBatcher(db, s.distSender.RangeDescriptorCache(), size, ts) + return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), size, size, ts) }, DiskMonitor: s.cfg.TempStorageConfig.Mon, diff --git a/pkg/sql/distsqlrun/backfiller.go b/pkg/sql/distsqlrun/backfiller.go index b9c70da777e9..4310edb99ef2 100644 --- a/pkg/sql/distsqlrun/backfiller.go +++ b/pkg/sql/distsqlrun/backfiller.go @@ -35,6 +35,12 @@ import ( ) type chunkBackfiller interface { + // prepare must be called before runChunk. + prepare(ctx context.Context) error + + // close should always be called to close a backfiller if prepare() was called. + close(ctx context.Context) + // runChunk returns the next-key and an error. next-key is nil // once the backfill is complete. runChunk( @@ -44,12 +50,15 @@ type chunkBackfiller interface { chunkSize int64, readAsOf hlc.Timestamp, ) (roachpb.Key, error) + + // flush must be called after the last chunk to finish buffered work. + flush(ctx context.Context) error } // backfiller is a processor that implements a distributed backfill of // an entity, like indexes or columns, during a schema change. type backfiller struct { - chunkBackfiller + chunks chunkBackfiller // name is the name of the kind of entity this backfiller processes. name string // mutationFilter returns true if the mutation should be processed by the @@ -116,6 +125,11 @@ func (b *backfiller) mainLoop(ctx context.Context) error { // Backfill the mutations for all the rows. chunkSize := b.spec.ChunkSize start := timeutil.Now() + + if err := b.chunks.prepare(ctx); err != nil { + return err + } + var resume roachpb.Span sp := work var nChunks, row = 0, int64(0) @@ -125,7 +139,7 @@ func (b *backfiller) mainLoop(ctx context.Context) error { b.name, desc.ID, mutationID, row, sp) } var err error - sp.Key, err = b.runChunk(ctx, mutations, sp, chunkSize, b.spec.ReadAsOf) + sp.Key, err = b.chunks.runChunk(ctx, mutations, sp, chunkSize, b.spec.ReadAsOf) if err != nil { return err } @@ -134,6 +148,9 @@ func (b *backfiller) mainLoop(ctx context.Context) error { break } } + if err := b.chunks.flush(ctx); err != nil { + return err + } log.VEventf(ctx, 2, "processed %d rows in %d chunks", row, nChunks) return WriteResumeSpan(ctx, b.flowCtx.ClientDB, diff --git a/pkg/sql/distsqlrun/columnbackfiller.go b/pkg/sql/distsqlrun/columnbackfiller.go index f8213726cc95..a9eb42655976 100644 --- a/pkg/sql/distsqlrun/columnbackfiller.go +++ b/pkg/sql/distsqlrun/columnbackfiller.go @@ -61,7 +61,7 @@ func newColumnBackfiller( spec: spec, }, } - cb.backfiller.chunkBackfiller = cb + cb.backfiller.chunks = cb if err := cb.ColumnBackfiller.Init(cb.flowCtx.NewEvalCtx(), cb.desc); err != nil { return nil, err @@ -70,6 +70,14 @@ func newColumnBackfiller( return cb, nil } +func (cb *columnBackfiller) close(ctx context.Context) {} +func (cb *columnBackfiller) prepare(ctx context.Context) error { + return nil +} +func (cb *columnBackfiller) flush(ctx context.Context) error { + return nil +} + // runChunk implements the chunkBackfiller interface. func (cb *columnBackfiller) runChunk( ctx context.Context, diff --git a/pkg/sql/distsqlrun/indexbackfiller.go b/pkg/sql/distsqlrun/indexbackfiller.go index 3c7b05c71d21..db2fce75c0e2 100644 --- a/pkg/sql/distsqlrun/indexbackfiller.go +++ b/pkg/sql/distsqlrun/indexbackfiller.go @@ -16,7 +16,6 @@ package distsqlrun import ( "context" - "sort" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -25,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -37,6 +37,8 @@ type indexBackfiller struct { backfill.IndexBackfiller + adder storagebase.BulkAdder + desc *sqlbase.ImmutableTableDescriptor } @@ -61,7 +63,7 @@ func newIndexBackfiller( spec: spec, }, } - ib.backfiller.chunkBackfiller = ib + ib.backfiller.chunks = ib if err := ib.IndexBackfiller.Init(ib.desc); err != nil { return nil, err @@ -70,6 +72,42 @@ func newIndexBackfiller( return ib, nil } +func (ib *indexBackfiller) prepare(ctx context.Context) error { + adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, ib.spec.ReadAsOf) + if err != nil { + return err + } + ib.adder = adder + ib.adder.SkipLocalDuplicates(ib.ContainsInvertedIndex()) + return nil +} + +func (ib indexBackfiller) close(ctx context.Context) { + ib.adder.Close(ctx) +} + +func (ib *indexBackfiller) flush(ctx context.Context) error { + return ib.wrapDupError(ctx, ib.adder.Flush(ctx)) +} + +func (ib *indexBackfiller) wrapDupError(ctx context.Context, orig error) error { + if orig == nil { + return nil + } + typed, ok := orig.(storagebase.DuplicateKeyError) + if !ok { + return orig + } + + desc, err := ib.desc.MakeFirstMutationPublic() + immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()) + if err != nil { + return err + } + v := &roachpb.Value{RawBytes: typed.Value} + return row.NewUniquenessConstraintViolationError(ctx, immutable, typed.Key, v) +} + func (ib *indexBackfiller) runChunk( tctx context.Context, mutations []sqlbase.DescriptorMutation, @@ -129,50 +167,23 @@ func (ib *indexBackfiller) runChunk( enabled := backfill.BulkWriteIndex.Get(&ib.flowCtx.Settings.SV) if enabled { start := timeutil.Now() - sort.Slice(entries, func(i, j int) bool { - return entries[i].Key.Compare(entries[j].Key) < 0 - }) - sortTime := timeutil.Now().Sub(start) - start = timeutil.Now() - adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, readAsOf) - if err != nil { - return nil, err + for _, i := range entries { + if err := ib.adder.Add(ctx, i.Key, i.Value.RawBytes); err != nil { + return nil, ib.wrapDupError(ctx, err) + } } - defer adder.Close() - containsInvertedIndex := ib.ContainsInvertedIndex() - for i := range entries { - if err := adder.Add(ctx, entries[i].Key, entries[i].Value.RawBytes); err != nil { - // Detect a duplicate within the SST being constructed. This is an - // insufficient but useful method for unique constraint enforcement - // and the index has to be validated after construction. - if i > 0 && entries[i-1].Key.Equal(entries[i].Key) { - if containsInvertedIndex { - // Depend on post index backfill validation to catch any errors. - continue - } - desc, err := ib.desc.MakeFirstMutationPublic() - immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()) - if err != nil { - return nil, err - } - entry := entries[i] - return nil, row.NewUniquenessConstraintViolationError( - ctx, immutable, entry.Key, &entry.Value) - } - return nil, err + if ib.flowCtx.testingKnobs.RunAfterBackfillChunk != nil { + if err := ib.adder.Flush(ctx); err != nil { + return nil, ib.wrapDupError(ctx, err) } } addTime := timeutil.Now().Sub(start) - if err := adder.Flush(ctx); err != nil { - return nil, err - } - // Don't log perf stats in tests with small indexes. if len(entries) > 1000 { - log.Infof(ctx, "index backfill stats: entries %d, prepare %+v, sort %+v, add-sst %+v", - len(entries), prepTime, sortTime, addTime) + log.Infof(ctx, "index backfill stats: entries %d, prepare %+v, add-sst %+v", + len(entries), prepTime, addTime) } return key, nil } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index eabd05ce0f94..1a45bcb63a40 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1516,6 +1516,10 @@ func TestSchemaChangePurgeFailure(t *testing.T) { } return nil }, + // the backfiller flushes after every batch if RunAfterBackfillChunk is + // non-nil so this noop fn means we can observe the partial-backfill that + // would otherwise just be buffered. + RunAfterBackfillChunk: func() {}, }, // Disable backfill migrations, we still need the jobs table migration. SQLMigrationManager: &sqlmigrations.MigrationManagerTestingKnobs{ diff --git a/pkg/storage/batcheval/cmd_add_sstable.go b/pkg/storage/batcheval/cmd_add_sstable.go index 364169fd4b6e..de06810c68e6 100644 --- a/pkg/storage/batcheval/cmd_add_sstable.go +++ b/pkg/storage/batcheval/cmd_add_sstable.go @@ -66,7 +66,7 @@ func EvalAddSSTable( return result.Result{}, errors.Wrap(err, "computing existing stats") } ms.Subtract(existingStats) - if existingStats.KeyCount > 0 { + if log.V(2) { log.Infof(ctx, "%s SST covers span containing %d existing keys: [%s, %s)", humanizeutil.IBytes(int64(len(args.Data))), existingStats.KeyCount, args.Key, args.EndKey) } diff --git a/pkg/storage/bulk/buffering_adder.go b/pkg/storage/bulk/buffering_adder.go new file mode 100644 index 000000000000..79a02b069ab8 --- /dev/null +++ b/pkg/storage/bulk/buffering_adder.go @@ -0,0 +1,179 @@ +// Copyright 2019 The Cockroach Authors. +// +/// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package bulk + +import ( + "bytes" + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// BufferingAdder is a wrapper for an SSTBatcher that allows out-of-order calls +// to Add, buffering them up and then sorting them before then passing them in +// order into an SSTBatcher +type BufferingAdder struct { + sink SSTBatcher + // timestamp applied to mvcc keys created from keys during SST construction. + timestamp hlc.Timestamp + // skips duplicates (iff they are buffered together). + skipDuplicates bool + + // threshold at which buffered entries will be flushed to SSTBatcher. + flushSize int64 + + // currently buffered kvs. + curBuf kvsByKey + // estimated memory usage of curBuf. + curBufSize int64 + + flushCounts struct { + total int + bufferSize int + } +} + +const kvOverhead = 24 + 24 // 2 slice headers, each assuming each is 8 + 8 + 8. + +// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed +// to add into SSTs that are then ingested. +func MakeBulkAdder( + db *client.DB, + rangeCache *kv.RangeDescriptorCache, + flushBytes, sstBytes int64, + timestamp hlc.Timestamp, +) (*BufferingAdder, error) { + b := &BufferingAdder{ + sink: SSTBatcher{db: db, maxSize: sstBytes, rc: rangeCache}, + timestamp: timestamp, + flushSize: flushBytes, + } + return b, nil +} + +// SkipLocalDuplicates configures skipping of duplicate keys in local batches. +func (b *BufferingAdder) SkipLocalDuplicates(skip bool) { + b.skipDuplicates = skip +} + +// Close closes the underlying SST builder. +func (b *BufferingAdder) Close(ctx context.Context) { + log.VEventf(ctx, 2, + "bulk adder ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size", + sz(b.sink.totalRows.DataSize), + b.flushCounts.total, b.flushCounts.bufferSize, + b.sink.flushCounts.total, b.sink.flushCounts.split, b.sink.flushCounts.sstSize, + ) + b.sink.Close() +} + +// Add adds a key to the buffer and checks if it needs to flush. +func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte) error { + if len(b.curBuf) == 0 { + if err := b.sink.Reset(); err != nil { + return err + } + } + b.curBuf = append(b.curBuf, kvPair{key, value}) + b.curBufSize += int64(cap(key)+cap(value)) + kvOverhead + + if b.curBufSize > b.flushSize { + b.flushCounts.bufferSize++ + log.VEventf(ctx, 3, "buffer size triggering flush of %s buffer", sz(b.curBufSize)) + return b.Flush(ctx) + } + return nil +} + +// Flush flushes any buffered kvs to the batcher. +func (b *BufferingAdder) Flush(ctx context.Context) error { + if len(b.curBuf) == 0 { + return nil + } + b.flushCounts.total++ + + before := b.sink.flushCounts + beforeSize := b.sink.totalRows.DataSize + + sort.Sort(b.curBuf) + for i, kv := range b.curBuf { + if b.skipDuplicates && i > 0 && bytes.Equal(b.curBuf[i-1].key, kv.key) { + continue + } + + if err := b.sink.AddMVCCKey(ctx, engine.MVCCKey{Key: kv.key, Timestamp: b.timestamp}, kv.value); err != nil { + if i > 0 && bytes.Equal(b.curBuf[i-1].key, kv.key) { + return storagebase.DuplicateKeyError{Key: kv.key, Value: kv.value} + } + return err + } + } + if err := b.sink.Flush(ctx); err != nil { + return err + } + + if log.V(3) { + written := b.sink.totalRows.DataSize - beforeSize + files := b.sink.flushCounts.total - before.total + dueToSplits := b.sink.flushCounts.split - before.split + dueToSize := b.sink.flushCounts.sstSize - before.sstSize + + log.Infof(ctx, + "flushing %s buffer wrote %d SSTs (avg: %s) with %d for splits, %d for size", + sz(b.curBufSize), files, sz(written/int64(files)), dueToSplits, dueToSize, + ) + } + + b.curBufSize = 0 + b.curBuf = b.curBuf[:0] + return nil +} + +// GetSummary returns this batcher's total added rows/bytes/etc. +func (b *BufferingAdder) GetSummary() roachpb.BulkOpSummary { + return b.sink.GetSummary() +} + +// kvPair is a bytes -> bytes kv pair. +type kvPair struct { + key roachpb.Key + value []byte +} + +type kvsByKey []kvPair + +// Len implements sort.Interface. +func (kvs kvsByKey) Len() int { + return len(kvs) +} + +// Less implements sort.Interface. +func (kvs kvsByKey) Less(i, j int) bool { + return bytes.Compare(kvs[i].key, kvs[j].key) < 0 +} + +// Swap implements sort.Interface. +func (kvs kvsByKey) Swap(i, j int) { + kvs[i], kvs[j] = kvs[j], kvs[i] +} + +var _ sort.Interface = kvsByKey{} diff --git a/pkg/storage/bulk/sst_batcher.go b/pkg/storage/bulk/sst_batcher.go index 30ffdaaec38a..91fcbddd99d2 100644 --- a/pkg/storage/bulk/sst_batcher.go +++ b/pkg/storage/bulk/sst_batcher.go @@ -23,33 +23,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) -// FixedTimestampSSTBatcher is a wrapper for SSTBatcher that assigns a fixed -// timestamp to all the added keys. -type FixedTimestampSSTBatcher struct { - timestamp hlc.Timestamp - SSTBatcher -} - -// MakeFixedTimestampSSTBatcher makes a ready-to-use SSTBatcher that generates -// an SST with all keys at the specified MVCC timestamp. If the rangeCache is -// non-nil, it will be used to minimize retries due to SSTs that span ranges. -func MakeFixedTimestampSSTBatcher( - db *client.DB, rangeCache *kv.RangeDescriptorCache, flushBytes int64, timestamp hlc.Timestamp, -) (*FixedTimestampSSTBatcher, error) { - b := &FixedTimestampSSTBatcher{timestamp, SSTBatcher{db: db, maxSize: flushBytes, rc: rangeCache}} - err := b.Reset() - return b, err -} +type sz int64 -// Add a key/value pair with the batcher's timestamp, flushing if needed. -// Keys must be added in order. -func (b *FixedTimestampSSTBatcher) Add(ctx context.Context, key roachpb.Key, value []byte) error { - return b.AddMVCCKey(ctx, engine.MVCCKey{Key: key, Timestamp: b.timestamp}, value) +func (b sz) String() string { + return humanizeutil.IBytes(int64(b)) } // SSTBatcher is a helper for bulk-adding many KVs in chunks via AddSSTable. An @@ -73,6 +55,12 @@ type SSTBatcher struct { sstWriter engine.RocksDBSstFileWriter batchStartKey []byte batchEndKey []byte + + flushCounts struct { + total int + split int + sstSize int + } } // MakeSSTBatcher makes a ready-to-use SSTBatcher. @@ -144,18 +132,27 @@ func (b *SSTBatcher) shouldFlush(ctx context.Context, nextKey roachpb.Key) bool log.Warningf(ctx, "failed to determine where to split SST: %v", err) } else if r != nil { b.flushKey = r.EndKey.AsRawKey() - log.VEventf(ctx, 2, "building sstable that will flush before %v", b.flushKey) + log.VEventf(ctx, 3, "building sstable that will flush before %v", b.flushKey) } else { - log.VEventf(ctx, 2, "no cached range desc available to determine sst flush key") + log.VEventf(ctx, 3, "no cached range desc available to determine sst flush key") } } } + size := b.sstWriter.DataSize + if b.flushKey != nil && b.flushKey.Compare(nextKey) <= 0 { + log.VEventf(ctx, 3, "flushing %s SST due to range boundary %s", sz(size), b.flushKey) + b.flushCounts.split++ return true } - return b.sstWriter.DataSize >= b.maxSize + if size >= b.maxSize { + log.VEventf(ctx, 3, "flushing %s SST due to size > %s", sz(size), sz(b.maxSize)) + b.flushCounts.sstSize++ + return true + } + return false } // Flush sends the current batch, if any. @@ -163,6 +160,8 @@ func (b *SSTBatcher) Flush(ctx context.Context) error { if b.sstWriter.DataSize == 0 { return nil } + b.flushCounts.total++ + start := roachpb.Key(append([]byte(nil), b.batchStartKey...)) // The end key of the WriteBatch request is exclusive, but batchEndKey is // currently the largest key in the batch. Increment it. @@ -197,7 +196,7 @@ func AddSSTable(ctx context.Context, db *client.DB, start, end roachpb.Key, sstB const maxAddSSTableRetries = 10 var err error for i := 0; i < maxAddSSTableRetries; i++ { - log.VEventf(ctx, 2, "sending %d byte AddSSTable [%s,%s)", len(sstBytes), start, end) + log.VEventf(ctx, 2, "sending %s AddSSTable [%s,%s)", sz(len(sstBytes)), start, end) // This will fail if the range has split but we'll check for that below. err = db.AddSSTable(ctx, start, end, sstBytes) if err == nil { diff --git a/pkg/storage/bulk/sst_batcher_test.go b/pkg/storage/bulk/sst_batcher_test.go index f53f7c33723d..481bf8a8c5f4 100644 --- a/pkg/storage/bulk/sst_batcher_test.go +++ b/pkg/storage/bulk/sst_batcher_test.go @@ -129,12 +129,12 @@ func runTestImport(t *testing.T, batchSize int64) { } ts := hlc.Timestamp{WallTime: 100} - b, err := bulk.MakeFixedTimestampSSTBatcher(kvDB, mockCache, batchSize, ts) + b, err := bulk.MakeBulkAdder(kvDB, mockCache, batchSize, batchSize, ts) if err != nil { t.Fatal(err) } - defer b.Close() + defer b.Close(ctx) var expected []client.KeyValue @@ -149,9 +149,6 @@ func runTestImport(t *testing.T, batchSize int64) { defer cancel() expectedSplitRetries := 0 for _, batch := range testCase { - if err := b.Reset(); err != nil { - t.Fatal(err) - } for idx, x := range batch { k := key(x) // if our adds is batching multiple keys and we've previously added diff --git a/pkg/storage/storagebase/bulk_adder.go b/pkg/storage/storagebase/bulk_adder.go index cadd69212b7d..da8134c81e60 100644 --- a/pkg/storage/storagebase/bulk_adder.go +++ b/pkg/storage/storagebase/bulk_adder.go @@ -16,6 +16,7 @@ package storagebase import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -36,7 +37,20 @@ type BulkAdder interface { // GetSummary returns a summary of rows/bytes/etc written by this batcher. GetSummary() roachpb.BulkOpSummary // Close closes the underlying buffers/writers. - Close() - // Reset resets the bulk-adder, returning it to its initial state. - Reset() error + Close(ctx context.Context) + // SkipLocalDuplicates configures handling of duplicate keys within a local + // sorted batch. Once a batch is flushed – explicitly or automatically – local + // duplicate detection does not apply. + SkipLocalDuplicates(bool) +} + +// DuplicateKeyError represents a failed attempt to ingest the same key twice +// using a BulkAdder within the same batch. +type DuplicateKeyError struct { + Key roachpb.Key + Value []byte +} + +func (d DuplicateKeyError) Error() string { + return fmt.Sprintf("duplicate key: %s", d.Key) } From b2433c4541110840419442531782d4fff85934ce Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 29 Mar 2019 15:33:03 +0000 Subject: [PATCH 2/2] distsqlrun: make backfiller buffer and sst sizes cluster settings Release note (sql change): add settings to control buffering in index backfiller. --- docs/generated/settings/settings.html | 2 ++ pkg/ccl/importccl/read_import_proc.go | 3 ++- pkg/server/server.go | 4 ++-- pkg/sql/distsqlrun/indexbackfiller.go | 13 ++++++++++++- pkg/storage/storagebase/bulk_adder.go | 2 +- 5 files changed, 19 insertions(+), 5 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 7dfae0dbece3..885cf774eba7 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -55,6 +55,8 @@ kv.transaction.write_pipelining_max_batch_sizeinteger128if non-zero, defines that maximum size batch that will be pipelined through Raft consensus kv.transaction.write_pipelining_max_outstanding_sizebyte size256 KiBmaximum number of bytes used to track in-flight pipelined writes before disabling pipelining rocksdb.min_wal_sync_intervalduration0sminimum duration between syncs of the RocksDB WAL +schemachanger.backfiller.buffer_sizebyte size196 MiBamount to buffer in memory during backfills +schemachanger.backfiller.max_sst_sizebyte size16 MiBtarget size for ingested files during backfills schemachanger.bulk_index_backfill.batch_sizeinteger5000000number of rows to process at a time during bulk index backfill schemachanger.bulk_index_backfill.enabledbooleantruebackfill indexes in bulk via addsstable schemachanger.lease.durationduration5m0sthe duration of a schema change lease diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 792224428206..4883b9db1926 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -498,7 +498,8 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error { defer tracing.FinishSpan(span) writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos} - adder, err := cp.flowCtx.BulkAdder(ctx, cp.flowCtx.ClientDB, 32<<20 /* flush at 32mb */, writeTS) + const bufferSize, flushSize = 64 << 20, 16 << 20 + adder, err := cp.flowCtx.BulkAdder(ctx, cp.flowCtx.ClientDB, bufferSize, flushSize, writeTS) if err != nil { return err } diff --git a/pkg/server/server.go b/pkg/server/server.go index 48fd13772cb8..b6cf19a41f86 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -525,8 +525,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ClusterID: &s.rpcContext.ClusterID, TempStorage: tempEngine, - BulkAdder: func(ctx context.Context, db *client.DB, size int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) { - return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), size, size, ts) + BulkAdder: func(ctx context.Context, db *client.DB, bufferSize, flushSize int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) { + return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), bufferSize, flushSize, ts) }, DiskMonitor: s.cfg.TempStorageConfig.Mon, diff --git a/pkg/sql/distsqlrun/indexbackfiller.go b/pkg/sql/distsqlrun/indexbackfiller.go index db2fce75c0e2..5c452fcf6fc7 100644 --- a/pkg/sql/distsqlrun/indexbackfiller.go +++ b/pkg/sql/distsqlrun/indexbackfiller.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -45,6 +46,14 @@ type indexBackfiller struct { var _ Processor = &indexBackfiller{} var _ chunkBackfiller = &indexBackfiller{} +var backfillerBufferSize = settings.RegisterByteSizeSetting( + "schemachanger.backfiller.buffer_size", "amount to buffer in memory during backfills", 196<<20, +) + +var backillerSSTSize = settings.RegisterByteSizeSetting( + "schemachanger.backfiller.max_sst_size", "target size for ingested files during backfills", 16<<20, +) + func newIndexBackfiller( flowCtx *FlowCtx, processorID int32, @@ -73,7 +82,9 @@ func newIndexBackfiller( } func (ib *indexBackfiller) prepare(ctx context.Context) error { - adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, ib.spec.ReadAsOf) + bufferSize := backfillerBufferSize.Get(&ib.flowCtx.Settings.SV) + sstSize := backillerSSTSize.Get(&ib.flowCtx.Settings.SV) + adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, bufferSize, sstSize, ib.spec.ReadAsOf) if err != nil { return err } diff --git a/pkg/storage/storagebase/bulk_adder.go b/pkg/storage/storagebase/bulk_adder.go index da8134c81e60..6ebdeb8fc427 100644 --- a/pkg/storage/storagebase/bulk_adder.go +++ b/pkg/storage/storagebase/bulk_adder.go @@ -25,7 +25,7 @@ import ( // BulkAdderFactory describes a factory function for BulkAdders. type BulkAdderFactory func( - ctx context.Context, db *client.DB, flushBytes int64, timestamp hlc.Timestamp, + ctx context.Context, db *client.DB, bufferBytes, flushBytes int64, timestamp hlc.Timestamp, ) (BulkAdder, error) // BulkAdder describes a bulk-adding helper that can be used to add lots of KVs.