diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 7dfae0dbece3..885cf774eba7 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -55,6 +55,8 @@
kv.transaction.write_pipelining_max_batch_size | integer | 128 | if non-zero, defines that maximum size batch that will be pipelined through Raft consensus |
kv.transaction.write_pipelining_max_outstanding_size | byte size | 256 KiB | maximum number of bytes used to track in-flight pipelined writes before disabling pipelining |
rocksdb.min_wal_sync_interval | duration | 0s | minimum duration between syncs of the RocksDB WAL |
+schemachanger.backfiller.buffer_size | byte size | 196 MiB | amount to buffer in memory during backfills |
+schemachanger.backfiller.max_sst_size | byte size | 16 MiB | target size for ingested files during backfills |
schemachanger.bulk_index_backfill.batch_size | integer | 5000000 | number of rows to process at a time during bulk index backfill |
schemachanger.bulk_index_backfill.enabled | boolean | true | backfill indexes in bulk via addsstable |
schemachanger.lease.duration | duration | 5m0s | the duration of a schema change lease |
diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go
index 11a0b2ede314..4883b9db1926 100644
--- a/pkg/ccl/importccl/read_import_proc.go
+++ b/pkg/ccl/importccl/read_import_proc.go
@@ -9,14 +9,12 @@
package importccl
import (
- "bytes"
"compress/bzip2"
"compress/gzip"
"context"
"io"
"io/ioutil"
"math/rand"
- "sort"
"strings"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
@@ -500,11 +498,12 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
defer tracing.FinishSpan(span)
writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
- adder, err := cp.flowCtx.BulkAdder(ctx, cp.flowCtx.ClientDB, 32<<20 /* flush at 32mb */, writeTS)
+ const bufferSize, flushSize = 64 << 20, 16 << 20
+ adder, err := cp.flowCtx.BulkAdder(ctx, cp.flowCtx.ClientDB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
- defer adder.Close()
+ defer adder.Close(ctx)
// Drain the kvCh using the BulkAdder until it closes.
if err := ingestKvs(ctx, adder, kvCh); err != nil {
@@ -655,20 +654,15 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB
if len(buf) == 0 {
return nil
}
- sort.Sort(buf)
for i := range buf {
if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil {
- if i > 0 && bytes.Equal(buf[i].Key, buf[i-1].Key) {
- return pgerror.Wrapf(err, pgerror.CodeDataExceptionError,
- errSSTCreationMaybeDuplicateTemplate, buf[i].Key)
+ if _, ok := err.(storagebase.DuplicateKeyError); ok {
+ return pgerror.Wrap(err, pgerror.CodeDataExceptionError, "")
}
return err
}
}
- if err := adder.Flush(ctx); err != nil {
- return err
- }
- return adder.Reset()
+ return nil
}
for kvBatch := range kvCh {
@@ -702,6 +696,13 @@ func ingestKvs(ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan kvB
return err
}
}
+
+ if err := adder.Flush(ctx); err != nil {
+ if err, ok := err.(storagebase.DuplicateKeyError); ok {
+ return pgerror.Wrap(err, pgerror.CodeDataExceptionError, "")
+ }
+ return err
+ }
return nil
}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 1be40d81b164..b6cf19a41f86 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -525,8 +525,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
ClusterID: &s.rpcContext.ClusterID,
TempStorage: tempEngine,
- BulkAdder: func(ctx context.Context, db *client.DB, size int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) {
- return bulk.MakeFixedTimestampSSTBatcher(db, s.distSender.RangeDescriptorCache(), size, ts)
+ BulkAdder: func(ctx context.Context, db *client.DB, bufferSize, flushSize int64, ts hlc.Timestamp) (storagebase.BulkAdder, error) {
+ return bulk.MakeBulkAdder(db, s.distSender.RangeDescriptorCache(), bufferSize, flushSize, ts)
},
DiskMonitor: s.cfg.TempStorageConfig.Mon,
diff --git a/pkg/sql/distsqlrun/backfiller.go b/pkg/sql/distsqlrun/backfiller.go
index b9c70da777e9..4310edb99ef2 100644
--- a/pkg/sql/distsqlrun/backfiller.go
+++ b/pkg/sql/distsqlrun/backfiller.go
@@ -35,6 +35,12 @@ import (
)
type chunkBackfiller interface {
+ // prepare must be called before runChunk.
+ prepare(ctx context.Context) error
+
+ // close should always be called to close a backfiller if prepare() was called.
+ close(ctx context.Context)
+
// runChunk returns the next-key and an error. next-key is nil
// once the backfill is complete.
runChunk(
@@ -44,12 +50,15 @@ type chunkBackfiller interface {
chunkSize int64,
readAsOf hlc.Timestamp,
) (roachpb.Key, error)
+
+ // flush must be called after the last chunk to finish buffered work.
+ flush(ctx context.Context) error
}
// backfiller is a processor that implements a distributed backfill of
// an entity, like indexes or columns, during a schema change.
type backfiller struct {
- chunkBackfiller
+ chunks chunkBackfiller
// name is the name of the kind of entity this backfiller processes.
name string
// mutationFilter returns true if the mutation should be processed by the
@@ -116,6 +125,11 @@ func (b *backfiller) mainLoop(ctx context.Context) error {
// Backfill the mutations for all the rows.
chunkSize := b.spec.ChunkSize
start := timeutil.Now()
+
+ if err := b.chunks.prepare(ctx); err != nil {
+ return err
+ }
+
var resume roachpb.Span
sp := work
var nChunks, row = 0, int64(0)
@@ -125,7 +139,7 @@ func (b *backfiller) mainLoop(ctx context.Context) error {
b.name, desc.ID, mutationID, row, sp)
}
var err error
- sp.Key, err = b.runChunk(ctx, mutations, sp, chunkSize, b.spec.ReadAsOf)
+ sp.Key, err = b.chunks.runChunk(ctx, mutations, sp, chunkSize, b.spec.ReadAsOf)
if err != nil {
return err
}
@@ -134,6 +148,9 @@ func (b *backfiller) mainLoop(ctx context.Context) error {
break
}
}
+ if err := b.chunks.flush(ctx); err != nil {
+ return err
+ }
log.VEventf(ctx, 2, "processed %d rows in %d chunks", row, nChunks)
return WriteResumeSpan(ctx,
b.flowCtx.ClientDB,
diff --git a/pkg/sql/distsqlrun/columnbackfiller.go b/pkg/sql/distsqlrun/columnbackfiller.go
index f8213726cc95..a9eb42655976 100644
--- a/pkg/sql/distsqlrun/columnbackfiller.go
+++ b/pkg/sql/distsqlrun/columnbackfiller.go
@@ -61,7 +61,7 @@ func newColumnBackfiller(
spec: spec,
},
}
- cb.backfiller.chunkBackfiller = cb
+ cb.backfiller.chunks = cb
if err := cb.ColumnBackfiller.Init(cb.flowCtx.NewEvalCtx(), cb.desc); err != nil {
return nil, err
@@ -70,6 +70,14 @@ func newColumnBackfiller(
return cb, nil
}
+func (cb *columnBackfiller) close(ctx context.Context) {}
+func (cb *columnBackfiller) prepare(ctx context.Context) error {
+ return nil
+}
+func (cb *columnBackfiller) flush(ctx context.Context) error {
+ return nil
+}
+
// runChunk implements the chunkBackfiller interface.
func (cb *columnBackfiller) runChunk(
ctx context.Context,
diff --git a/pkg/sql/distsqlrun/indexbackfiller.go b/pkg/sql/distsqlrun/indexbackfiller.go
index 3c7b05c71d21..5c452fcf6fc7 100644
--- a/pkg/sql/distsqlrun/indexbackfiller.go
+++ b/pkg/sql/distsqlrun/indexbackfiller.go
@@ -16,15 +16,16 @@ package distsqlrun
import (
"context"
- "sort"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
+ "github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -37,12 +38,22 @@ type indexBackfiller struct {
backfill.IndexBackfiller
+ adder storagebase.BulkAdder
+
desc *sqlbase.ImmutableTableDescriptor
}
var _ Processor = &indexBackfiller{}
var _ chunkBackfiller = &indexBackfiller{}
+var backfillerBufferSize = settings.RegisterByteSizeSetting(
+ "schemachanger.backfiller.buffer_size", "amount to buffer in memory during backfills", 196<<20,
+)
+
+var backillerSSTSize = settings.RegisterByteSizeSetting(
+ "schemachanger.backfiller.max_sst_size", "target size for ingested files during backfills", 16<<20,
+)
+
func newIndexBackfiller(
flowCtx *FlowCtx,
processorID int32,
@@ -61,7 +72,7 @@ func newIndexBackfiller(
spec: spec,
},
}
- ib.backfiller.chunkBackfiller = ib
+ ib.backfiller.chunks = ib
if err := ib.IndexBackfiller.Init(ib.desc); err != nil {
return nil, err
@@ -70,6 +81,44 @@ func newIndexBackfiller(
return ib, nil
}
+func (ib *indexBackfiller) prepare(ctx context.Context) error {
+ bufferSize := backfillerBufferSize.Get(&ib.flowCtx.Settings.SV)
+ sstSize := backillerSSTSize.Get(&ib.flowCtx.Settings.SV)
+ adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, bufferSize, sstSize, ib.spec.ReadAsOf)
+ if err != nil {
+ return err
+ }
+ ib.adder = adder
+ ib.adder.SkipLocalDuplicates(ib.ContainsInvertedIndex())
+ return nil
+}
+
+func (ib indexBackfiller) close(ctx context.Context) {
+ ib.adder.Close(ctx)
+}
+
+func (ib *indexBackfiller) flush(ctx context.Context) error {
+ return ib.wrapDupError(ctx, ib.adder.Flush(ctx))
+}
+
+func (ib *indexBackfiller) wrapDupError(ctx context.Context, orig error) error {
+ if orig == nil {
+ return nil
+ }
+ typed, ok := orig.(storagebase.DuplicateKeyError)
+ if !ok {
+ return orig
+ }
+
+ desc, err := ib.desc.MakeFirstMutationPublic()
+ immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc())
+ if err != nil {
+ return err
+ }
+ v := &roachpb.Value{RawBytes: typed.Value}
+ return row.NewUniquenessConstraintViolationError(ctx, immutable, typed.Key, v)
+}
+
func (ib *indexBackfiller) runChunk(
tctx context.Context,
mutations []sqlbase.DescriptorMutation,
@@ -129,50 +178,23 @@ func (ib *indexBackfiller) runChunk(
enabled := backfill.BulkWriteIndex.Get(&ib.flowCtx.Settings.SV)
if enabled {
start := timeutil.Now()
- sort.Slice(entries, func(i, j int) bool {
- return entries[i].Key.Compare(entries[j].Key) < 0
- })
- sortTime := timeutil.Now().Sub(start)
- start = timeutil.Now()
- adder, err := ib.flowCtx.BulkAdder(ctx, ib.flowCtx.ClientDB, 32<<20, readAsOf)
- if err != nil {
- return nil, err
+ for _, i := range entries {
+ if err := ib.adder.Add(ctx, i.Key, i.Value.RawBytes); err != nil {
+ return nil, ib.wrapDupError(ctx, err)
+ }
}
- defer adder.Close()
- containsInvertedIndex := ib.ContainsInvertedIndex()
- for i := range entries {
- if err := adder.Add(ctx, entries[i].Key, entries[i].Value.RawBytes); err != nil {
- // Detect a duplicate within the SST being constructed. This is an
- // insufficient but useful method for unique constraint enforcement
- // and the index has to be validated after construction.
- if i > 0 && entries[i-1].Key.Equal(entries[i].Key) {
- if containsInvertedIndex {
- // Depend on post index backfill validation to catch any errors.
- continue
- }
- desc, err := ib.desc.MakeFirstMutationPublic()
- immutable := sqlbase.NewImmutableTableDescriptor(*desc.TableDesc())
- if err != nil {
- return nil, err
- }
- entry := entries[i]
- return nil, row.NewUniquenessConstraintViolationError(
- ctx, immutable, entry.Key, &entry.Value)
- }
- return nil, err
+ if ib.flowCtx.testingKnobs.RunAfterBackfillChunk != nil {
+ if err := ib.adder.Flush(ctx); err != nil {
+ return nil, ib.wrapDupError(ctx, err)
}
}
addTime := timeutil.Now().Sub(start)
- if err := adder.Flush(ctx); err != nil {
- return nil, err
- }
-
// Don't log perf stats in tests with small indexes.
if len(entries) > 1000 {
- log.Infof(ctx, "index backfill stats: entries %d, prepare %+v, sort %+v, add-sst %+v",
- len(entries), prepTime, sortTime, addTime)
+ log.Infof(ctx, "index backfill stats: entries %d, prepare %+v, add-sst %+v",
+ len(entries), prepTime, addTime)
}
return key, nil
}
diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go
index eabd05ce0f94..1a45bcb63a40 100644
--- a/pkg/sql/schema_changer_test.go
+++ b/pkg/sql/schema_changer_test.go
@@ -1516,6 +1516,10 @@ func TestSchemaChangePurgeFailure(t *testing.T) {
}
return nil
},
+ // the backfiller flushes after every batch if RunAfterBackfillChunk is
+ // non-nil so this noop fn means we can observe the partial-backfill that
+ // would otherwise just be buffered.
+ RunAfterBackfillChunk: func() {},
},
// Disable backfill migrations, we still need the jobs table migration.
SQLMigrationManager: &sqlmigrations.MigrationManagerTestingKnobs{
diff --git a/pkg/storage/batcheval/cmd_add_sstable.go b/pkg/storage/batcheval/cmd_add_sstable.go
index 364169fd4b6e..de06810c68e6 100644
--- a/pkg/storage/batcheval/cmd_add_sstable.go
+++ b/pkg/storage/batcheval/cmd_add_sstable.go
@@ -66,7 +66,7 @@ func EvalAddSSTable(
return result.Result{}, errors.Wrap(err, "computing existing stats")
}
ms.Subtract(existingStats)
- if existingStats.KeyCount > 0 {
+ if log.V(2) {
log.Infof(ctx, "%s SST covers span containing %d existing keys: [%s, %s)", humanizeutil.IBytes(int64(len(args.Data))), existingStats.KeyCount, args.Key, args.EndKey)
}
diff --git a/pkg/storage/bulk/buffering_adder.go b/pkg/storage/bulk/buffering_adder.go
new file mode 100644
index 000000000000..79a02b069ab8
--- /dev/null
+++ b/pkg/storage/bulk/buffering_adder.go
@@ -0,0 +1,179 @@
+// Copyright 2019 The Cockroach Authors.
+//
+/// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package bulk
+
+import (
+ "bytes"
+ "context"
+ "sort"
+
+ "github.com/cockroachdb/cockroach/pkg/internal/client"
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/storage/engine"
+ "github.com/cockroachdb/cockroach/pkg/storage/storagebase"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+)
+
+// BufferingAdder is a wrapper for an SSTBatcher that allows out-of-order calls
+// to Add, buffering them up and then sorting them before then passing them in
+// order into an SSTBatcher
+type BufferingAdder struct {
+ sink SSTBatcher
+ // timestamp applied to mvcc keys created from keys during SST construction.
+ timestamp hlc.Timestamp
+ // skips duplicates (iff they are buffered together).
+ skipDuplicates bool
+
+ // threshold at which buffered entries will be flushed to SSTBatcher.
+ flushSize int64
+
+ // currently buffered kvs.
+ curBuf kvsByKey
+ // estimated memory usage of curBuf.
+ curBufSize int64
+
+ flushCounts struct {
+ total int
+ bufferSize int
+ }
+}
+
+const kvOverhead = 24 + 24 // 2 slice headers, each assuming each is 8 + 8 + 8.
+
+// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed
+// to add into SSTs that are then ingested.
+func MakeBulkAdder(
+ db *client.DB,
+ rangeCache *kv.RangeDescriptorCache,
+ flushBytes, sstBytes int64,
+ timestamp hlc.Timestamp,
+) (*BufferingAdder, error) {
+ b := &BufferingAdder{
+ sink: SSTBatcher{db: db, maxSize: sstBytes, rc: rangeCache},
+ timestamp: timestamp,
+ flushSize: flushBytes,
+ }
+ return b, nil
+}
+
+// SkipLocalDuplicates configures skipping of duplicate keys in local batches.
+func (b *BufferingAdder) SkipLocalDuplicates(skip bool) {
+ b.skipDuplicates = skip
+}
+
+// Close closes the underlying SST builder.
+func (b *BufferingAdder) Close(ctx context.Context) {
+ log.VEventf(ctx, 2,
+ "bulk adder ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
+ sz(b.sink.totalRows.DataSize),
+ b.flushCounts.total, b.flushCounts.bufferSize,
+ b.sink.flushCounts.total, b.sink.flushCounts.split, b.sink.flushCounts.sstSize,
+ )
+ b.sink.Close()
+}
+
+// Add adds a key to the buffer and checks if it needs to flush.
+func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte) error {
+ if len(b.curBuf) == 0 {
+ if err := b.sink.Reset(); err != nil {
+ return err
+ }
+ }
+ b.curBuf = append(b.curBuf, kvPair{key, value})
+ b.curBufSize += int64(cap(key)+cap(value)) + kvOverhead
+
+ if b.curBufSize > b.flushSize {
+ b.flushCounts.bufferSize++
+ log.VEventf(ctx, 3, "buffer size triggering flush of %s buffer", sz(b.curBufSize))
+ return b.Flush(ctx)
+ }
+ return nil
+}
+
+// Flush flushes any buffered kvs to the batcher.
+func (b *BufferingAdder) Flush(ctx context.Context) error {
+ if len(b.curBuf) == 0 {
+ return nil
+ }
+ b.flushCounts.total++
+
+ before := b.sink.flushCounts
+ beforeSize := b.sink.totalRows.DataSize
+
+ sort.Sort(b.curBuf)
+ for i, kv := range b.curBuf {
+ if b.skipDuplicates && i > 0 && bytes.Equal(b.curBuf[i-1].key, kv.key) {
+ continue
+ }
+
+ if err := b.sink.AddMVCCKey(ctx, engine.MVCCKey{Key: kv.key, Timestamp: b.timestamp}, kv.value); err != nil {
+ if i > 0 && bytes.Equal(b.curBuf[i-1].key, kv.key) {
+ return storagebase.DuplicateKeyError{Key: kv.key, Value: kv.value}
+ }
+ return err
+ }
+ }
+ if err := b.sink.Flush(ctx); err != nil {
+ return err
+ }
+
+ if log.V(3) {
+ written := b.sink.totalRows.DataSize - beforeSize
+ files := b.sink.flushCounts.total - before.total
+ dueToSplits := b.sink.flushCounts.split - before.split
+ dueToSize := b.sink.flushCounts.sstSize - before.sstSize
+
+ log.Infof(ctx,
+ "flushing %s buffer wrote %d SSTs (avg: %s) with %d for splits, %d for size",
+ sz(b.curBufSize), files, sz(written/int64(files)), dueToSplits, dueToSize,
+ )
+ }
+
+ b.curBufSize = 0
+ b.curBuf = b.curBuf[:0]
+ return nil
+}
+
+// GetSummary returns this batcher's total added rows/bytes/etc.
+func (b *BufferingAdder) GetSummary() roachpb.BulkOpSummary {
+ return b.sink.GetSummary()
+}
+
+// kvPair is a bytes -> bytes kv pair.
+type kvPair struct {
+ key roachpb.Key
+ value []byte
+}
+
+type kvsByKey []kvPair
+
+// Len implements sort.Interface.
+func (kvs kvsByKey) Len() int {
+ return len(kvs)
+}
+
+// Less implements sort.Interface.
+func (kvs kvsByKey) Less(i, j int) bool {
+ return bytes.Compare(kvs[i].key, kvs[j].key) < 0
+}
+
+// Swap implements sort.Interface.
+func (kvs kvsByKey) Swap(i, j int) {
+ kvs[i], kvs[j] = kvs[j], kvs[i]
+}
+
+var _ sort.Interface = kvsByKey{}
diff --git a/pkg/storage/bulk/sst_batcher.go b/pkg/storage/bulk/sst_batcher.go
index 30ffdaaec38a..91fcbddd99d2 100644
--- a/pkg/storage/bulk/sst_batcher.go
+++ b/pkg/storage/bulk/sst_batcher.go
@@ -23,33 +23,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
- "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
-// FixedTimestampSSTBatcher is a wrapper for SSTBatcher that assigns a fixed
-// timestamp to all the added keys.
-type FixedTimestampSSTBatcher struct {
- timestamp hlc.Timestamp
- SSTBatcher
-}
-
-// MakeFixedTimestampSSTBatcher makes a ready-to-use SSTBatcher that generates
-// an SST with all keys at the specified MVCC timestamp. If the rangeCache is
-// non-nil, it will be used to minimize retries due to SSTs that span ranges.
-func MakeFixedTimestampSSTBatcher(
- db *client.DB, rangeCache *kv.RangeDescriptorCache, flushBytes int64, timestamp hlc.Timestamp,
-) (*FixedTimestampSSTBatcher, error) {
- b := &FixedTimestampSSTBatcher{timestamp, SSTBatcher{db: db, maxSize: flushBytes, rc: rangeCache}}
- err := b.Reset()
- return b, err
-}
+type sz int64
-// Add a key/value pair with the batcher's timestamp, flushing if needed.
-// Keys must be added in order.
-func (b *FixedTimestampSSTBatcher) Add(ctx context.Context, key roachpb.Key, value []byte) error {
- return b.AddMVCCKey(ctx, engine.MVCCKey{Key: key, Timestamp: b.timestamp}, value)
+func (b sz) String() string {
+ return humanizeutil.IBytes(int64(b))
}
// SSTBatcher is a helper for bulk-adding many KVs in chunks via AddSSTable. An
@@ -73,6 +55,12 @@ type SSTBatcher struct {
sstWriter engine.RocksDBSstFileWriter
batchStartKey []byte
batchEndKey []byte
+
+ flushCounts struct {
+ total int
+ split int
+ sstSize int
+ }
}
// MakeSSTBatcher makes a ready-to-use SSTBatcher.
@@ -144,18 +132,27 @@ func (b *SSTBatcher) shouldFlush(ctx context.Context, nextKey roachpb.Key) bool
log.Warningf(ctx, "failed to determine where to split SST: %v", err)
} else if r != nil {
b.flushKey = r.EndKey.AsRawKey()
- log.VEventf(ctx, 2, "building sstable that will flush before %v", b.flushKey)
+ log.VEventf(ctx, 3, "building sstable that will flush before %v", b.flushKey)
} else {
- log.VEventf(ctx, 2, "no cached range desc available to determine sst flush key")
+ log.VEventf(ctx, 3, "no cached range desc available to determine sst flush key")
}
}
}
+ size := b.sstWriter.DataSize
+
if b.flushKey != nil && b.flushKey.Compare(nextKey) <= 0 {
+ log.VEventf(ctx, 3, "flushing %s SST due to range boundary %s", sz(size), b.flushKey)
+ b.flushCounts.split++
return true
}
- return b.sstWriter.DataSize >= b.maxSize
+ if size >= b.maxSize {
+ log.VEventf(ctx, 3, "flushing %s SST due to size > %s", sz(size), sz(b.maxSize))
+ b.flushCounts.sstSize++
+ return true
+ }
+ return false
}
// Flush sends the current batch, if any.
@@ -163,6 +160,8 @@ func (b *SSTBatcher) Flush(ctx context.Context) error {
if b.sstWriter.DataSize == 0 {
return nil
}
+ b.flushCounts.total++
+
start := roachpb.Key(append([]byte(nil), b.batchStartKey...))
// The end key of the WriteBatch request is exclusive, but batchEndKey is
// currently the largest key in the batch. Increment it.
@@ -197,7 +196,7 @@ func AddSSTable(ctx context.Context, db *client.DB, start, end roachpb.Key, sstB
const maxAddSSTableRetries = 10
var err error
for i := 0; i < maxAddSSTableRetries; i++ {
- log.VEventf(ctx, 2, "sending %d byte AddSSTable [%s,%s)", len(sstBytes), start, end)
+ log.VEventf(ctx, 2, "sending %s AddSSTable [%s,%s)", sz(len(sstBytes)), start, end)
// This will fail if the range has split but we'll check for that below.
err = db.AddSSTable(ctx, start, end, sstBytes)
if err == nil {
diff --git a/pkg/storage/bulk/sst_batcher_test.go b/pkg/storage/bulk/sst_batcher_test.go
index f53f7c33723d..481bf8a8c5f4 100644
--- a/pkg/storage/bulk/sst_batcher_test.go
+++ b/pkg/storage/bulk/sst_batcher_test.go
@@ -129,12 +129,12 @@ func runTestImport(t *testing.T, batchSize int64) {
}
ts := hlc.Timestamp{WallTime: 100}
- b, err := bulk.MakeFixedTimestampSSTBatcher(kvDB, mockCache, batchSize, ts)
+ b, err := bulk.MakeBulkAdder(kvDB, mockCache, batchSize, batchSize, ts)
if err != nil {
t.Fatal(err)
}
- defer b.Close()
+ defer b.Close(ctx)
var expected []client.KeyValue
@@ -149,9 +149,6 @@ func runTestImport(t *testing.T, batchSize int64) {
defer cancel()
expectedSplitRetries := 0
for _, batch := range testCase {
- if err := b.Reset(); err != nil {
- t.Fatal(err)
- }
for idx, x := range batch {
k := key(x)
// if our adds is batching multiple keys and we've previously added
diff --git a/pkg/storage/storagebase/bulk_adder.go b/pkg/storage/storagebase/bulk_adder.go
index cadd69212b7d..6ebdeb8fc427 100644
--- a/pkg/storage/storagebase/bulk_adder.go
+++ b/pkg/storage/storagebase/bulk_adder.go
@@ -16,6 +16,7 @@ package storagebase
import (
"context"
+ "fmt"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -24,7 +25,7 @@ import (
// BulkAdderFactory describes a factory function for BulkAdders.
type BulkAdderFactory func(
- ctx context.Context, db *client.DB, flushBytes int64, timestamp hlc.Timestamp,
+ ctx context.Context, db *client.DB, bufferBytes, flushBytes int64, timestamp hlc.Timestamp,
) (BulkAdder, error)
// BulkAdder describes a bulk-adding helper that can be used to add lots of KVs.
@@ -36,7 +37,20 @@ type BulkAdder interface {
// GetSummary returns a summary of rows/bytes/etc written by this batcher.
GetSummary() roachpb.BulkOpSummary
// Close closes the underlying buffers/writers.
- Close()
- // Reset resets the bulk-adder, returning it to its initial state.
- Reset() error
+ Close(ctx context.Context)
+ // SkipLocalDuplicates configures handling of duplicate keys within a local
+ // sorted batch. Once a batch is flushed – explicitly or automatically – local
+ // duplicate detection does not apply.
+ SkipLocalDuplicates(bool)
+}
+
+// DuplicateKeyError represents a failed attempt to ingest the same key twice
+// using a BulkAdder within the same batch.
+type DuplicateKeyError struct {
+ Key roachpb.Key
+ Value []byte
+}
+
+func (d DuplicateKeyError) Error() string {
+ return fmt.Sprintf("duplicate key: %s", d.Key)
}