Skip to content

Commit

Permalink
Add bounds checking to WBWIIteratorImpl and respect bounds of ReadOpt…
Browse files Browse the repository at this point in the history
…ions in Transaction (#11680)

Summary:
Fix #11607
Fix #11679
Fix #11606
Fix #2343

Add bounds checking to `WBWIIteratorImpl`, which will be reflected in `BaseDeltaIterator::delta_iterator_::Valid()`, just like `BaseDeltaIterator::base_iterator_::Valid()`. In this way, the two sub itertors become more aligned from `BaseDeltaIterator`'s perspective. Like `DBIter`, the added bounds checking caps in either bound when seeking and disvalidates the `WBWIIteratorImpl` iterator when the lower bound is past or the upper bound is reached.

Pull Request resolved: #11680

Test Plan:
- A simple test added to write_batch_with_index_test.cc to exercise the bounds checking in `WBWIIteratorImpl`.
- A sophisticated test added to transaction_test.cc to assert that `Transaction` with different write policies honor bounds in `ReadOptions`. It should be so as long as the  `BaseDeltaIterator` is correctly coordinating the two sub iterators to perform iterating and bounds checking.

Reviewed By: ajkr

Differential Revision: D48125229

Pulled By: cbi42

fbshipit-source-id: c9acea52595aed1471a63d7ca6ef15d2a2af1367
  • Loading branch information
ywave620 authored and facebook-github-bot committed Oct 20, 2023
1 parent d7567d5 commit 543191f
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add bounds check in WBWIIteratorImpl and make BaseDeltaIterator, WriteUnpreparedTxn and WritePreparedTxn respect the upper bound and lower bound in ReadOption. See 11680.
106 changes: 106 additions & 0 deletions utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,112 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, true)));
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)

TEST_P(TransactionTest, TestUpperBoundUponDeletion) {
// Reproduction from the original bug report, 11606
// This test does writes without snapshot validation, and then tries to create
// iterator later, which is unsupported in write unprepared.
if (txn_db_options.write_policy == WRITE_UNPREPARED) {
return;
}

WriteOptions write_options;
ReadOptions read_options;
Status s;

Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);

// Write some keys in a txn
s = txn->Put("2", "2");
ASSERT_OK(s);

s = txn->Put("1", "1");
ASSERT_OK(s);

s = txn->Delete("2");
ASSERT_OK(s);

read_options.iterate_upper_bound = new Slice("2", 1);
Iterator* iter = txn->GetIterator(read_options);
ASSERT_OK(iter->status());
iter->SeekToFirst();
while (iter->Valid()) {
ASSERT_EQ("1", iter->key().ToString());
iter->Next();
}
delete iter;
delete txn;
delete read_options.iterate_upper_bound;
}

TEST_P(TransactionTest, TestTxnRespectBoundsInReadOption) {
if (txn_db_options.write_policy == WRITE_UNPREPARED) {
return;
}

WriteOptions write_options;

{
std::unique_ptr<Transaction> txn(db->BeginTransaction(write_options));
// writes that should be observed by base_iterator_ in BaseDeltaIterator
ASSERT_OK(txn->Put("a", "aa"));
ASSERT_OK(txn->Put("c", "cc"));
ASSERT_OK(txn->Put("e", "ee"));
ASSERT_OK(txn->Put("f", "ff"));
ASSERT_TRUE(txn->Commit().ok());
}

std::unique_ptr<Transaction> txn2(db->BeginTransaction(write_options));
// writes that should be observed by delta_iterator_ in BaseDeltaIterator
ASSERT_OK(txn2->Put("b", "bb"));
ASSERT_OK(txn2->Put("c", "cc"));
ASSERT_OK(txn2->Put("f", "ff"));

// delta_iterator_: b c f
// base_iterator_: a c e f
//
// given range [c, f)
// assert only {c, e} can be seen

ReadOptions ro;
ro.iterate_lower_bound = new Slice("c");
ro.iterate_upper_bound = new Slice("f");
std::unique_ptr<Iterator> iter(txn2->GetIterator(ro));

iter->Seek(Slice("b"));
ASSERT_EQ("c", iter->key()); // lower bound capping
iter->Seek(Slice("f"));
ASSERT_FALSE(iter->Valid()); // out of bound

iter->SeekForPrev(Slice("f"));
ASSERT_EQ("e", iter->key()); // upper bound capping
iter->SeekForPrev(Slice("b"));
ASSERT_FALSE(iter->Valid()); // out of bound

// move to the lower bound
iter->SeekToFirst();
ASSERT_EQ("c", iter->key());
iter->Prev();
ASSERT_FALSE(iter->Valid());

// move to the upper bound
iter->SeekToLast();
ASSERT_EQ("e", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());

// reversely walk to the beginning
iter->SeekToLast();
ASSERT_EQ("e", iter->key());
iter->Prev();
ASSERT_EQ("c", iter->key());
iter->Prev();
ASSERT_FALSE(iter->Valid());

delete ro.iterate_lower_bound;
delete ro.iterate_upper_bound;
}

TEST_P(TransactionTest, DoubleEmptyWrite) {
WriteOptions write_options;
write_options.sync = true;
Expand Down
8 changes: 2 additions & 6 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ Status WritePreparedTxn::GetImpl(const ReadOptions& options,
}

Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
// Make sure to get iterator from WritePrepareTxnDB, not the root db.
Iterator* db_iter = wpt_db_->NewIterator(options);
assert(db_iter);

return write_batch_.NewIteratorWithBase(db_iter);
return GetIterator(options, wpt_db_->DefaultColumnFamily());
}

Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Expand All @@ -136,7 +132,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
assert(db_iter);

return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
}

Status WritePreparedTxn::PrepareInternal() {
Expand Down
3 changes: 2 additions & 1 deletion utilities/transactions/write_unprepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,8 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter);

auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
auto iter =
write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
active_iterators_.push_back(iter);
iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
return iter;
Expand Down
18 changes: 13 additions & 5 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,20 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
ColumnFamilyHandle* column_family, Iterator* base_iterator,
const ReadOptions* read_options) {
auto wbwiii =
new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list),
&rep->write_batch, &rep->comparator);
WBWIIteratorImpl* wbwiii;
if (read_options != nullptr) {
wbwiii = new WBWIIteratorImpl(
GetColumnFamilyID(column_family), &(rep->skip_list), &rep->write_batch,
&rep->comparator, read_options->iterate_lower_bound,
read_options->iterate_upper_bound);
} else {
wbwiii = new WBWIIteratorImpl(GetColumnFamilyID(column_family),
&(rep->skip_list), &rep->write_batch,
&rep->comparator);
}

return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
GetColumnFamilyUserComparator(column_family),
read_options);
GetColumnFamilyUserComparator(column_family));
}

Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ namespace ROCKSDB_NAMESPACE {
BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options)
const Comparator* comparator)
: forward_(true),
current_at_base_(true),
equal_keys_(false),
status_(Status::OK()),
column_family_(column_family),
base_iterator_(base_iterator),
delta_iterator_(delta_iterator),
comparator_(comparator),
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
: nullptr) {
comparator_(comparator) {
assert(base_iterator_);
assert(delta_iterator_);
assert(comparator_);
Expand Down Expand Up @@ -332,14 +329,6 @@ void BaseDeltaIterator::UpdateCurrent() {
// Finished
return;
}
if (iterate_upper_bound_) {
if (comparator_->CompareWithoutTimestamp(
delta_entry.key, /*a_has_ts=*/false, *iterate_upper_bound_,
/*b_has_ts=*/false) >= 0) {
// out of upper bound -> finished.
return;
}
}
if (delta_result == WBWIIteratorImpl::kDeleted &&
merge_context_.GetNumOperands() == 0) {
AdvanceDelta();
Expand Down
125 changes: 105 additions & 20 deletions utilities/write_batch_with_index/write_batch_with_index_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options = nullptr);
const Comparator* comparator);

~BaseDeltaIterator() override {}

Expand Down Expand Up @@ -72,7 +71,6 @@ class BaseDeltaIterator : public Iterator {
std::unique_ptr<Iterator> base_iterator_;
std::unique_ptr<WBWIIteratorImpl> delta_iterator_;
const Comparator* comparator_; // not owned
const Slice* iterate_upper_bound_;
MergeContext merge_context_;
std::string merge_result_;
Slice value_;
Expand Down Expand Up @@ -200,59 +198,107 @@ class WBWIIteratorImpl : public WBWIIterator {
WBWIIteratorImpl(uint32_t column_family_id,
WriteBatchEntrySkipList* skip_list,
const ReadableWriteBatch* write_batch,
WriteBatchEntryComparator* comparator)
WriteBatchEntryComparator* comparator,
const Slice* iterate_lower_bound = nullptr,
const Slice* iterate_upper_bound = nullptr)
: column_family_id_(column_family_id),
skip_list_iter_(skip_list),
write_batch_(write_batch),
comparator_(comparator) {}
comparator_(comparator),
iterate_lower_bound_(iterate_lower_bound),
iterate_upper_bound_(iterate_upper_bound) {}

~WBWIIteratorImpl() override {}

bool Valid() const override {
if (!skip_list_iter_.Valid()) {
return false;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
return (iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
return !out_of_bound_ && ValidRegardlessOfBoundLimit();
}

void SeekToFirst() override {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
if (iterate_lower_bound_ != nullptr) {
WriteBatchIndexEntry search_entry(
iterate_lower_bound_ /* search_key */, column_family_id_,
true /* is_forward_direction */, false /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
} else {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_,
true /* is_forward_direction */, true /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);
}

if (ValidRegardlessOfBoundLimit()) {
out_of_bound_ = TestOutOfBound();
}
}

void SeekToLast() override {
WriteBatchIndexEntry search_entry(
nullptr /* search_key */, column_family_id_ + 1,
true /* is_forward_direction */, true /* is_seek_to_first */);
WriteBatchIndexEntry search_entry =
(iterate_upper_bound_ != nullptr)
? WriteBatchIndexEntry(
iterate_upper_bound_ /* search_key */, column_family_id_,
true /* is_forward_direction */, false /* is_seek_to_first */)
: WriteBatchIndexEntry(
nullptr /* search_key */, column_family_id_ + 1,
true /* is_forward_direction */, true /* is_seek_to_first */);

skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast();
} else {
skip_list_iter_.Prev();
}

if (ValidRegardlessOfBoundLimit()) {
out_of_bound_ = TestOutOfBound();
}
}

void Seek(const Slice& key) override {
if (BeforeLowerBound(&key)) { // cap to prevent out of bound
SeekToFirst();
return;
}

WriteBatchIndexEntry search_entry(&key, column_family_id_,
true /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.Seek(&search_entry);

if (ValidRegardlessOfBoundLimit()) {
out_of_bound_ = TestOutOfBound();
}
}

void SeekForPrev(const Slice& key) override {
if (AtOrAfterUpperBound(&key)) { // cap to prevent out of bound
SeekToLast();
return;
}

WriteBatchIndexEntry search_entry(&key, column_family_id_,
false /* is_forward_direction */,
false /* is_seek_to_first */);
skip_list_iter_.SeekForPrev(&search_entry);

if (ValidRegardlessOfBoundLimit()) {
out_of_bound_ = TestOutOfBound();
}
}

void Next() override { skip_list_iter_.Next(); }
void Next() override {
skip_list_iter_.Next();
if (ValidRegardlessOfBoundLimit()) {
out_of_bound_ = TestOutOfBound();
}
}

void Prev() override { skip_list_iter_.Prev(); }
void Prev() override {
skip_list_iter_.Prev();
if (ValidRegardlessOfBoundLimit()) {
out_of_bound_ = TestOutOfBound();
}
}

WriteEntry Entry() const override;

Expand Down Expand Up @@ -293,6 +339,45 @@ class WBWIIteratorImpl : public WBWIIterator {
WriteBatchEntrySkipList::Iterator skip_list_iter_;
const ReadableWriteBatch* write_batch_;
WriteBatchEntryComparator* comparator_;
const Slice* iterate_lower_bound_;
const Slice* iterate_upper_bound_;
bool out_of_bound_ = false;

bool TestOutOfBound() const {
const Slice& curKey = Entry().key;
return AtOrAfterUpperBound(&curKey) || BeforeLowerBound(&curKey);
}

bool ValidRegardlessOfBoundLimit() const {
if (!skip_list_iter_.Valid()) {
return false;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
return iter_entry != nullptr &&
iter_entry->column_family == column_family_id_;
}

bool AtOrAfterUpperBound(const Slice* k) const {
if (iterate_upper_bound_ == nullptr) {
return false;
}

return comparator_->GetComparator(column_family_id_)
->CompareWithoutTimestamp(*k, /*a_has_ts=*/false,
*iterate_upper_bound_,
/*b_has_ts=*/false) >= 0;
}

bool BeforeLowerBound(const Slice* k) const {
if (iterate_lower_bound_ == nullptr) {
return false;
}

return comparator_->GetComparator(column_family_id_)
->CompareWithoutTimestamp(*k, /*a_has_ts=*/false,
*iterate_lower_bound_,
/*b_has_ts=*/false) < 0;
}
};

class WriteBatchWithIndexInternal {
Expand Down
Loading

0 comments on commit 543191f

Please sign in to comment.