Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libroach,engine: support pagination of ExportToSst #44440

Merged
merged 1 commit into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,9 @@ DBStatus DBUnlockFile(DBFileLock lock) {
return ToDBStatus(rocksdb::Env::Default()->UnlockFile((rocksdb::FileLock*)lock));
}

DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, DBString* write_intent,
DBString* summary) {
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size,
DBIterOptions iter_opts, DBEngine* engine, DBString* data,
DBString* write_intent, DBString* summary, DBString* resume) {
DBSstFileWriter* writer = DBSstFileWriterNew();
DBStatus status = DBSstFileWriterOpen(writer);
if (status.data != NULL) {
Expand All @@ -1092,14 +1092,23 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
bool skip_current_key_versions = !export_all_revisions;
DBIterState state;
const std::string end_key = EncodeKey(end);
for (state = iter.seek(start);; state = iter.next(skip_current_key_versions)) {
// cur_key is used when paginated is true and export_all_revisions is
// true. If we're exporting all revisions and we're returning a paginated
// SST then we need to keep track of when we've finished adding all of the
// versions of a key to the writer.
const bool paginated = target_size > 0;
std::string cur_key;
std::string resume_key;
// Seek to the MVCC metadata key for the provided start key and let the
// incremental iterator find the appropriate version.
const DBKey seek_key = { .key = start.key };
for (state = iter.seek(seek_key);; state = iter.next(skip_current_key_versions)) {
if (state.status.data != NULL) {
DBSstFileWriterClose(writer);
return state.status;
} else if (!state.valid || kComparator.Compare(iter.key(), end_key) >= 0) {
break;
}

rocksdb::Slice decoded_key;
int64_t wall_time = 0;
int32_t logical_time = 0;
Expand All @@ -1109,13 +1118,32 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
return ToDBString("Unable to decode key");
}

const bool is_new_key = !export_all_revisions || decoded_key.compare(cur_key) != 0;
if (paginated && export_all_revisions && is_new_key) {
// Reuse the underlying buffer in cur_key.
cur_key.clear();
cur_key.reserve(decoded_key.size());
cur_key.assign(decoded_key.data(), decoded_key.size());
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions;
const bool is_skipping_deletes = start.wall_time == 0 && start.logical == 0 && !export_all_revisions;
if (is_skipping_deletes && iter.value().size() == 0) {
continue;
}

// Check to see if this is the first version of key and adding it would
// put us over the limit (we might already be over the limit).
const int64_t cur_size = bulkop_summary.data_size();
const int64_t new_size = cur_size + decoded_key.size() + iter.value().size();
const bool is_over_target = cur_size > 0 && new_size > target_size;
if (paginated && is_new_key && is_over_target) {
resume_key.reserve(decoded_key.size());
resume_key.assign(decoded_key.data(), decoded_key.size());
break;
}

// Insert key into sst and update statistics.
status = DBSstFileWriterAddRaw(writer, iter.key(), iter.value());
if (status.data != NULL) {
Expand All @@ -1126,8 +1154,7 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
if (!row_counter.Count((iter.key()), &bulkop_summary)) {
return ToDBString("Error in row counter");
}
bulkop_summary.set_data_size(bulkop_summary.data_size() + decoded_key.size() +
iter.value().size());
bulkop_summary.set_data_size(new_size);
}
*summary = ToDBString(bulkop_summary.SerializeAsString());

Expand All @@ -1139,6 +1166,11 @@ DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIter
auto res = DBSstFileWriterFinish(writer, data);
DBSstFileWriterClose(writer);

// If we're not returning an error, check to see if we need to return the resume key.
if (res.data == NULL && resume_key.length() > 0) {
*resume = ToDBString(resume_key);
}

return res;
}

Expand Down
15 changes: 12 additions & 3 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,23 @@ typedef void* DBFileLock;
// DBLockFile sets a lock on the specified file using RocksDB's file locking interface.
DBStatus DBLockFile(DBSlice filename, DBFileLock* lock);

// DBUnlockFile unlocks the file asscoiated with the specified lock and GCs any allocated memory for
// DBUnlockFile unlocks the file associated with the specified lock and GCs any allocated memory for
// the lock.
DBStatus DBUnlockFile(DBFileLock lock);

// DBExportToSst exports changes over the keyrange and time interval between the
// start and end DBKeys to an SSTable using an IncrementalIterator.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, DBIterOptions iter_opts,
DBEngine* engine, DBString* data, DBString* write_intent, DBString* summary);
// If target_size is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will produce SSTs which contain
// all relevant versions of a key and will not add the first version of a new
// key if it would lead to the SST exceeding the target_size. If export_all_revisions
// is false, the returned SST will be smaller than target_size so long as the first
// kv pair is smaller than target_size. If export_all_revisions is true then
// target_size may be exceeded. If the SST construction stops due to the target_size,
// then resume will be set to the value of the resume key.
DBStatus DBExportToSst(DBKey start, DBKey end, bool export_all_revisions, uint64_t target_size,
DBIterOptions iter_opts, DBEngine* engine, DBString* data,
DBString* write_intent, DBString* summary, DBString* resume);

// DBEnvOpenReadableFile opens a DBReadableFile in the given engine.
DBStatus DBEnvOpenReadableFile(DBEngine* db, DBSlice path, DBReadableFile* file);
Expand Down
12 changes: 9 additions & 3 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
crdberrors "github.com/cockroachdb/errors"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -130,9 +131,14 @@ func evalExport(
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})

data, summary, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, io)

// TODO(ajwerner): Add a constant or internal cluster setting to control the
// target size for files and then paginate the actual export. The external
// API may need to be modified to deal with the case where ReturnSST is true.
const targetSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, args.StartTime, h.Timestamp, exportAllRevisions, targetSize, io)
if resume != nil {
return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size")
}
if err != nil {
return result.Result{}, err
}
Expand Down
73 changes: 45 additions & 28 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func TestExportCmd(t *testing.T) {
Expand Down Expand Up @@ -302,6 +303,7 @@ func assertEqualKVs(
startTime, endTime hlc.Timestamp,
exportAllRevisions bool,
enableTimeBoundIteratorOptimization bool,
targetSize uint64,
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
Expand All @@ -328,17 +330,19 @@ func assertEqualKVs(
io.MaxTimestampHint = endTime
io.MinTimestampHint = startTime.Next()
}
sst, _, err := e.ExportToSst(startKey, endKey, startTime, endTime, exportAllRevisions, io)
if err != nil {
t.Fatal(err)
var kvs []engine.MVCCKeyValue
for start := startKey; start != nil; {
var sst []byte
sst, _, start, err = e.ExportToSst(start, endKey, startTime, endTime, exportAllRevisions, targetSize, io)
require.NoError(t, err)
loaded := loadSST(t, sst, startKey, endKey)
kvs = append(kvs, loaded...)
}

// Compare new C++ implementation against the oracle.
expectedKVS := loadSST(t, expected, startKey, endKey)
kvs := loadSST(t, sst, startKey, endKey)

if len(kvs) != len(expectedKVS) {
t.Fatalf("got %d kvs (%+v) but expected %d (%+v)", len(kvs), kvs, len(expected), expected)
t.Fatalf("got %d kvs but expected %d:\n%v\n%v", len(kvs), len(expectedKVS), kvs, expectedKVS)
}

for i := range kvs {
Expand Down Expand Up @@ -430,27 +434,40 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
timestamps[i].Logical < timestamps[j].Logical)
})

testWithTargetSize := func(t *testing.T, targetSize uint64) {
if testing.Short() && targetSize > 0 && targetSize < 1<<15 {
t.Skipf("testing with size %d is slow", targetSize)
}
t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false, targetSize))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false, targetSize))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true, targetSize))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true, targetSize))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false, targetSize))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false, targetSize))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true, targetSize))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true, targetSize))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true, targetSize))
}
// Exercise min to max time and key ranges.
t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true))
for _, targetSize := range []uint64{
0 /* unlimited */, 1 << 10, 1 << 16, 1 << 20,
} {
t.Run(fmt.Sprintf("targetSize=%d", targetSize), func(t *testing.T) {
testWithTargetSize(t, targetSize)
})
}

}
14 changes: 12 additions & 2 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,19 @@ type Reader interface {
// within the interval is exported. Deletions are included if all revisions are
// requested or if the start.Timestamp is non-zero. Returns the bytes of an
// SSTable containing the exported keys, the size of exported data, or an error.
// If targetSize is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will produce SSTs which contain
// all relevant versions of a key and will not add the first version of a new
// key if it would lead to the SST exceeding the targetSize. If exportAllRevisions
// is false, the returned SST will be smaller than target_size so long as the first
// kv pair is smaller than targetSize. If exportAllRevisions is true then
// targetSize may be exceeded by as much as the size of all of the versions of
// the last key. If the SST construction stops due to the targetSize,
// then a non-nil resumeKey will be returned.
ExportToSst(
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, exportAllRevisions bool, io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error)
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, io IterOptions,
) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
// Get returns the value for the given key, nil otherwise.
//
// Deprecated: use MVCCGet instead.
Expand Down
47 changes: 33 additions & 14 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,10 @@ func (p *Pebble) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io)
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
}

// Get implements the Engine interface.
Expand Down Expand Up @@ -938,9 +939,10 @@ func (p *pebbleReadOnly) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io)
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
}

func (p *pebbleReadOnly) Get(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1059,9 +1061,10 @@ func (p *pebbleSnapshot) ExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, io)
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, io)
}

// Get implements the Reader interface.
Expand Down Expand Up @@ -1113,8 +1116,9 @@ func pebbleExportToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize uint64,
io IterOptions,
) ([]byte, roachpb.BulkOpSummary, error) {
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
defer sstWriter.Close()
Expand All @@ -1128,12 +1132,15 @@ func pebbleExportToSst(
EndTime: endTS,
})
defer iter.Close()
var curKey roachpb.Key // only used if exportAllRevisions
var resumeKey roachpb.Key
paginated := targetSize > 0
for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; {
ok, err := iter.Valid()
if err != nil {
// The error may be a WriteIntentError. In which case, returning it will
// cause this command to be retried.
return nil, roachpb.BulkOpSummary{}, err
return nil, roachpb.BulkOpSummary{}, nil, err
}
if !ok {
break
Expand All @@ -1143,18 +1150,30 @@ func pebbleExportToSst(
break
}
unsafeValue := iter.UnsafeValue()
isNewKey := !exportAllRevisions || !unsafeKey.Key.Equal(curKey)
if paginated && exportAllRevisions && isNewKey {
curKey = append(curKey[:0], unsafeKey.Key...)
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
skipTombstones := !exportAllRevisions && startTS.IsEmpty()
if len(unsafeValue) > 0 || !skipTombstones {
if err := rows.Count(unsafeKey.Key); err != nil {
return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "decoding %s", unsafeKey)
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "decoding %s", unsafeKey)
}
curSize := rows.BulkOpSummary.DataSize
newSize := curSize + int64(len(unsafeKey.Key)+len(unsafeValue))
isOverTarget := paginated && curSize > 0 && uint64(newSize) > targetSize
if isNewKey && isOverTarget {
// Allocate the right size for resumeKey rather than using curKey.
resumeKey = append(make(roachpb.Key, 0, len(unsafeKey.Key)), unsafeKey.Key...)
break
}
rows.BulkOpSummary.DataSize += int64(len(unsafeKey.Key) + len(unsafeValue))
if err := sstWriter.Put(unsafeKey, unsafeValue); err != nil {
return nil, roachpb.BulkOpSummary{}, errors.Wrapf(err, "adding key %s", unsafeKey)
return nil, roachpb.BulkOpSummary{}, nil, errors.Wrapf(err, "adding key %s", unsafeKey)
}
rows.BulkOpSummary.DataSize = newSize
}

if exportAllRevisions {
Expand All @@ -1167,12 +1186,12 @@ func pebbleExportToSst(
if rows.BulkOpSummary.DataSize == 0 {
// If no records were added to the sstable, skip completing it and return a
// nil slice – the export code will discard it anyway (based on 0 DataSize).
return nil, roachpb.BulkOpSummary{}, nil
return nil, roachpb.BulkOpSummary{}, nil, nil
}

if err := sstWriter.Finish(); err != nil {
return nil, roachpb.BulkOpSummary{}, err
return nil, roachpb.BulkOpSummary{}, nil, err
}

return sstFile.Data(), rows.BulkOpSummary, nil
return sstFile.Data(), rows.BulkOpSummary, resumeKey, nil
}
Loading