Skip to content

Commit

Permalink
storage/engine: fix DeleteRange+ApplyBatchRepr
Browse files Browse the repository at this point in the history
Added support for DeleteRange to DBBatchInserter. This missing support
was causing test failures and various data corruption when enabling
COCKROACH_ENABLE_FAST_CLEAR_RANGE after #14138 started using
ApplyBatchRepr on batches containing DeleteRange operations.

See #14391
  • Loading branch information
petermattis committed Mar 28, 2017
1 parent 0f0677b commit 0ffaf1a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
22 changes: 18 additions & 4 deletions pkg/storage/engine/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,16 @@ class DBBatchInserter : public rocksdb::WriteBatch::Handler {
virtual void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) {
batch_->Merge(key, value);
}
// NB: we don't support DeleteRangeCF yet which causes us to pick up
// the default implementation that returns an error.
virtual rocksdb::Status DeleteRangeCF(
uint32_t column_family_id,
const rocksdb::Slice& begin_key,
const rocksdb::Slice& end_key) {
if (column_family_id == 0) {
batch_->DeleteRange(begin_key, end_key);
return rocksdb::Status::OK();
}
return rocksdb::Status::InvalidArgument("DeleteRangeCF not implemented");
}

private:
rocksdb::WriteBatchBase* const batch_;
Expand Down Expand Up @@ -1884,7 +1892,10 @@ DBStatus DBBatch::ApplyBatchRepr(DBSlice repr, bool sync) {
// repr directly instead of first converting it to a string.
DBBatchInserter inserter(&batch);
rocksdb::WriteBatch batch(ToString(repr));
batch.Iterate(&inserter);
rocksdb::Status status = batch.Iterate(&inserter);
if (!status.ok()) {
return ToDBStatus(status);
}
updates += batch.Count();
return kSuccess;
}
Expand All @@ -1897,7 +1908,10 @@ DBStatus DBWriteOnlyBatch::ApplyBatchRepr(DBSlice repr, bool sync) {
// repr directly instead of first converting it to a string.
DBBatchInserter inserter(&batch);
rocksdb::WriteBatch batch(ToString(repr));
batch.Iterate(&inserter);
rocksdb::Status status = batch.Iterate(&inserter);
if (!status.ok()) {
return ToDBStatus(status);
}
updates += batch.Count();
return kSuccess;
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func testEngineDeleteRange(t *testing.T, clearRange func(engine Engine, start, e

// Delete a range of keys
if err := clearRange(engine, mvccKey("aa"), mvccKey("abc")); err != nil {
t.Error("Not expecting an error")
t.Fatal(err)
}
// Verify what's left
verifyScan(mvccKey(roachpb.RKeyMin), mvccKey(roachpb.RKeyMax), 10,
Expand All @@ -610,6 +610,23 @@ func TestEngineDeleteRange(t *testing.T) {
})
}

func TestEngineDeleteRangeBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
testEngineDeleteRange(t, func(engine Engine, start, end MVCCKey) error {
batch := engine.NewWriteOnlyBatch()
defer batch.Close()
if err := batch.ClearRange(start, end); err != nil {
return err
}
batch2 := engine.NewWriteOnlyBatch()
defer batch2.Close()
if err := batch2.ApplyBatchRepr(batch.Repr(), false); err != nil {
return err
}
return batch2.Commit(false)
})
}

func TestEngineDeleteIterRange(t *testing.T) {
defer leaktest.AfterTest(t)()
testEngineDeleteRange(t, func(engine Engine, start, end MVCCKey) error {
Expand Down

0 comments on commit 0ffaf1a

Please sign in to comment.