Skip to content

Commit

Permalink
libroach,engine: support pagination of ExportToSst
Browse files Browse the repository at this point in the history
This commit extends the engine interface to take a targetSize parameter in
the ExportToSst method. The iteration will stope if the first version of a key
to be added to the SST would lead to targetSize being exceeded. If
exportAllRevisions is false, the targetSize will not be exceeded unless the
first kv pair exceeds it.

This commit additionally fixes a bug in the rocksdb implementation of
DBExportToSst whereby the first key in the export request would be skipped.
This case likely never occurred because the key passed to Export was rarely
exactly the first key to be included (see the change related to seek_key in
db.cc).

The exportccl.TestRandomKeyAndTimestampExport was extended to excercise various
targetSize limits. That test run under stress with the tee engine inspires some
confidence and did catch the above mentioned bug. More testing would likely be
good.

This commit leaves the task of adopting the targetSize parameter for later.

Fixes #39717.

Release note: None
  • Loading branch information
ajwerner committed Jan 28, 2020
1 parent 20908ab commit 05f91e5
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 88 deletions.
48 changes: 40 additions & 8 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,9 @@ DBStatus DBUnlockFile(DBFileLock lock) {
return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock));
}

DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, DBString* write_intent,
DBString* summary) {
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size,
DBIterOptions iter_opts, DBEngine* engine, DBString* data,
DBString* write_intent, DBString* summary, DBString* resume) {
DBSstFileWriter* writer = DBSstFileWriterNew();
DBStatus status = DBSstFileWriterOpen(writer);
if (status.data != NULL) {
Expand All @@ -1092,14 +1092,23 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
bool skip_current_key_versions = !export_all_revisions;
DBIterState state;
const std::string end_key = EncodeKey(end);
for (state = iter.seek(start);; state = iter.next(skip_current_key_versions)) {
// cur_key is used when paginated is true and export_all_revisions is
// true. If we're exporting all revisions and we're returning a paginated
// SST then we need to keep track of when we've finished adding all of the
// versions of a key to the writer.
const bool paginated = target_size > 0;
std::string cur_key;
std::string resume_key;
// Seek to the MVCC metadata key for the provided start key and let the
// incremental iterator find the appropriate version.
const DBKey seek_key = { .key = start.key };
for (state = iter.seek(seek_key);; state = iter.next(skip_current_key_versions)) {
if (state.status.data != NULL) {
DBSstFileWriterClose(writer);
return state.status;
} else if (!state.valid || kComparator.Compare(iter.key(), end_key) >= 0) {
break;
}

rocksdb::Slice decoded_key;
int64_t wall_time = 0;
int32_t logical_time = 0;
Expand All @@ -1109,13 +1118,32 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
return ToDBString("Unable to decode key");
}

const bool is_new_key = !export_all_revisions || decoded_key.compare(cur_key) != 0;
if (paginated && export_all_revisions && is_new_key) {
// Reuse the underlying buffer in cur_key.
cur_key.clear();
cur_key.reserve(decoded_key.size());
cur_key.assign(decoded_key.data(), decoded_key.size());
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions;
const bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions;
if (is_skipping_deletes && iter.value().size() == 0) {
continue;
}

// Check to see if this is the first version of key and adding it would
// put us over the limit (we might already be over the limit).
const int64_t cur_size = bulkop_summary.data_size();
const int64_t new_size = cur_size + decoded_key.size() + iter.value().size();
const bool is_over_target = cur_size > 0 && new_size > target_size;
if (paginated && is_new_key && is_over_target) {
resume_key.reserve(decoded_key.size());
resume_key.assign(decoded_key.data(), decoded_key.size());
break;
}

// Insert key into sst and update statistics.
status = DBSstFileWriterAddRaw(writer, iter.key(), iter.value());
if (status.data != NULL) {
Expand All @@ -1126,8 +1154,7 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
if (!row_counter.Count((iter.key()), &bulkop_summary)) {
return ToDBString("Error in row counter");
}
bulkop_summary.set_data_size(bulkop_summary.data_size() + decoded_key.size() +
iter.value().size());
bulkop_summary.set_data_size(new_size);
}
*summary = ToDBString(bulkop_summary.SerializeAsString());

Expand All @@ -1139,6 +1166,11 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
auto res = DBSstFileWriterFinish(writer, data);
DBSstFileWriterClose(writer);

// If we're not returning an error, check to see if we need to return the resume key.
if (res.data == NULL && resume_key.length() > 0) {
*resume = ToDBString(resume_key);
}

return res;
}

Expand Down
15 changes: 12 additions & 3 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,23 @@ typedef void* DBFileLock;
// DBLockFile sets a lock on the specified file using RocksDB's file locking interface.
DBStatus DBLockFile(DBSlice filename, DBFileLock* lock);

// DBUnlockFile unlocks the file asscoiated with the specified lock and GCs any allocated memory for
// DBUnlockFile unlocks the file associated with the specified lock and GCs any allocated memory for
// the lock.
DBStatus DBUnlockFile(DBFileLock lock);

// DBExportToSst exports changes over the keyrange and time interval between the
// start and end DBKeys to an SSTable using an IncrementalIterator.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, DBString* write_intent, DBString* summary);
// If target_size is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will produce SSTs which contain
// all relevant versions of a key and will not add the first version of a new
// key if it would lead to the SST exceeding the target_size. If export_all_revisions
// is false, the returned SST will be smaller than target_size so long as the first
// kv pair is smaller than target_size. If export_all_revisions is true then
// target_size may be exceeded. If the SST construction stops due to the target_size,
// then resume will be set to the value of the resume key.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size,
DBIterOptions iter_opts, DBEngine* engine, DBString* data,
DBString* write_intent, DBString* summary, DBString* resume);

// DBEnvOpenReadableFile opens a DBReadableFile in the given engine.
DBStatus DBEnvOpenReadableFile(DBEngine* db, DBSlice path, DBReadableFile* file);
Expand Down
12 changes: 9 additions & 3 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
crdberrors "github.com/cockroachdb/errors"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -130,9 +131,14 @@ func evalExport(
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})

data, summary, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, io)

// TODO(ajwerner): Add a constant or internal cluster setting to control the
// target size for files and then paginate the actual export. The external
// API may need to be modified to deal with the case where ReturnSST is true.
const targetSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, targetSize, io)
if resume != nil {
return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size")
}
if err != nil {
return result.Result{}, err
}
Expand Down
73 changes: 45 additions & 28 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func TestExportCmd(t *testing.T) {
Expand Down Expand Up @@ -302,6 +303,7 @@ func assertEqualKVs(
startTime, endTime hlc.Timestamp,
exportAllRevisions bool,
enableTimeBoundIteratorOptimization bool,
targetSize uint64,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
Expand All @@ -328,17 +330,19 @@ func assertEqualKVs(
io.MaxTimestampHint = endTime
io.MinTimestampHint = startTime.Next()
}
sst, _, err := e.ExportToSst(startKey, endKey, startTime, endTime, exportAllRevisions, io)
if err != nil {
t.Fatal(err)
var kvs []engine.MVCCKeyValue
for start := startKey; start != nil; {
var sst []byte
sst, _, start, err = e.ExportToSst(start, endKey, startTime, endTime, exportAllRevisions, targetSize, io)
require.NoError(t, err)
loaded := loadSST(t, sst, startKey, endKey)
kvs = append(kvs, loaded...)
}

// Compare new C++ implementation against the oracle.
expectedKVS := loadSST(t, expected, startKey, endKey)
kvs := loadSST(t, sst, startKey, endKey)

if len(kvs) != len(expectedKVS) {
t.Fatalf("got %d kvs (%+v) but expected %d (%+v)", len(kvs), kvs, len(expected), expected)
t.Fatalf("got %d kvs but expected %d:\n%v\n%v", len(kvs), len(expectedKVS), kvs, expectedKVS)
}

for i := range kvs {
Expand Down Expand Up @@ -430,27 +434,40 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
timestamps[i].Logical < timestamps[j].Logical)
})

testWithTargetSize := func(t *testing.T, targetSize uint64) {
if testing.Short() && targetSize > 0 && targetSize < 1<<15 {
t.Skipf("testing with size %d is slow", targetSize)
}
t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false, targetSize))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false, targetSize))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true, targetSize))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true, targetSize))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false, targetSize))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false, targetSize))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true, targetSize))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true, targetSize))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true, targetSize))
}
// Exercise min to max time and key ranges.
t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true))
for _, targetSize := range []uint64{
0 /* unlimited */, 1 << 10, 1 << 16, 1 << 20,
} {
t.Run(fmt.Sprintf("targetSize=%d", targetSize), func(t *testing.T) {
testWithTargetSize(t, targetSize)
})
}

}
14 changes: 12 additions & 2 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,19 @@ type Reader interface {
// within the interval is exported. Deletions are included if all revisions are
// requested or if the start.Timestamp is non-zero. Returns the bytes of an
// SSTable containing the exported keys, the size of exported data, or an error.
// If targetSize is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will produce SSTs which contain
// all relevant versions of a key and will not add the first version of a new
// key if it would lead to the SST exceeding the targetSize. If exportAllRevisions
// is false, the returned SST will be smaller than target_size so long as the first
// kv pair is smaller than targetSize. If exportAllRevisions is true then
// targetSize may be exceeded by as much as the size of all of the versions of
// the last key. If the SST construction stops due to the targetSize,
// then a non-nil resumeKey will be returned.
ExportToSst(
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error)
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, io IterOptions,
) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
// Get returns the value for the given key, nil otherwise.
//
// Deprecated: use MVCCGet instead.
Expand Down
47 changes: 33 additions & 14 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,10 @@ func (p *Pebble) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io)
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
}

// Get implements the Engine interface.
Expand Down Expand Up @@ -938,9 +939,10 @@ func (p *pebbleReadOnly) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io)
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
}

func (p *pebbleReadOnly) Get(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1059,9 +1061,10 @@ func (p *pebbleSnapshot) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io)
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
}

// Get implements the Reader interface.
Expand Down Expand Up @@ -1113,8 +1116,9 @@ func pebbleExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
defer sstWriter.Close()
Expand All @@ -1128,12 +1132,15 @@ func pebbleExportToSst(
EndTime: endTS,
})
defer iter.Close()
var curKey roachpb.Key // only used if exportAllRevisions
var resumeKey roachpb.Key
paginated := targetSize > 0
for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; {
ok, err := iter.Valid()
if err != nil {
// The error may be a WriteIntentError. In which case, returning it will
// cause this command to be retried.
return nil, roachpb.BulkOpSummary{}, err
return nil, roachpb.BulkOpSummary{}, nil, err
}
if !ok {
break
Expand All @@ -1143,18 +1150,30 @@ func pebbleExportToSst(
break
}
unsafeValue := iter.UnsafeValue()
isNewKey := !exportAllRevisions || !unsafeKey.Key.Equal(curKey)
if paginated && exportAllRevisions && isNewKey {
curKey = append(curKey[:0], unsafeKey.Key...)
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
skipTombstones := !exportAllRevisions && startTS.IsEmpty()
if len(unsafeValue) > 0 || !skipTombstones {
if err := rows.Count(unsafeKey.Key); err != nil {
return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "decoding %s", unsafeKey)
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey)
}
curSize := rows.BulkOpSummary.DataSize
newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue))
isOverTarget := paginated && curSize > 0 && uint64(newSize) > targetSize
if isNewKey && isOverTarget {
// Allocate the right size for resumeKey rather than using curKey.
resumeKey = append(make(roachpb.Key, 0, len(unsafeKey.Key)), unsafeKey.Key...)
break
}
rows.BulkOpSummary.DataSize += int64(len(unsafeKey.Key) + len(unsafeValue))
if err := sstWriter.Put(unsafeKey, unsafeValue); err != nil {
return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "adding key %s", unsafeKey)
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
}
rows.BulkOpSummary.DataSize = newSize
}

if exportAllRevisions {
Expand All @@ -1167,12 +1186,12 @@ func pebbleExportToSst(
if rows.BulkOpSummary.DataSize == 0 {
// If no records were added to the sstable, skip completing it and return a
// nil slice – the export code will discard it anyway (based on 0 DataSize).
return nil, roachpb.BulkOpSummary{}, nil
return nil, roachpb.BulkOpSummary{}, nil, nil
}

if err := sstWriter.Finish(); err != nil {
return nil, roachpb.BulkOpSummary{}, err
return nil, roachpb.BulkOpSummary{}, nil, err
}

return sstFile.Data(), rows.BulkOpSummary, nil
return sstFile.Data(), rows.BulkOpSummary, resumeKey, nil
}
Loading

0 comments on commit 05f91e5

Please sign in to comment.