Skip to content

Commit

Permalink
Address PR Comments
Browse files Browse the repository at this point in the history
Found a solution that did not require the new API signature.  Updated the code accordingly.  Updated HISTORY
  • Loading branch information
mrambacher committed Apr 11, 2021
1 parent d74970c commit f2665d1
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 59 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
## Unreleased
### Behavior Changes
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.
* Added support for WriteBatchWithIndex::NewIteratorWithBase when overwrite_key=false. Previously, this combination was not supported and would assert or return nullptr.
* Improve the behavior of WriteBatchWithIndex for Merge operations. Now more operations may be stored in order to return the correct merged result.

### Bug Fixes
* Use thread-safe `strerror_r()` to get error messages.
Expand Down
5 changes: 0 additions & 5 deletions include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@ class WriteBatchWithIndex : public WriteBatchBase {
Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
Iterator* base_iterator,
const ReadOptions* opts = nullptr);
Iterator* NewIteratorWithBase(DB* db, Iterator* base_iterator,
const ReadOptions* opts = nullptr);
Iterator* NewIteratorWithBase(DB* db, ColumnFamilyHandle* column_family,
Iterator* base_iterator,
const ReadOptions* opts = nullptr);
// default column family
Iterator* NewIteratorWithBase(Iterator* base_iterator);

Expand Down
5 changes: 3 additions & 2 deletions utilities/transactions/transaction_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,16 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);

return write_batch_.NewIteratorWithBase(db_, db_iter);
return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
&read_options);
}

Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);

return write_batch_.NewIteratorWithBase(db_, column_family, db_iter,
return write_batch_.NewIteratorWithBase(column_family, db_iter,
&read_options);
}

Expand Down
2 changes: 1 addition & 1 deletion utilities/transactions/write_unprepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter);

auto iter = write_batch_.NewIteratorWithBase(db_, column_family, db_iter);
auto iter = write_batch_.NewIteratorWithBase(column_family, db_iter);
active_iterators_.push_back(iter);
iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
return iter;
Expand Down
20 changes: 3 additions & 17 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,36 +284,22 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
&(rep->comparator));
}

Iterator* WriteBatchWithIndex::NewIteratorWithBase(DB* db,
Iterator* base_iterator,
const ReadOptions* opts) {
return NewIteratorWithBase(db, db->DefaultColumnFamily(), base_iterator,
opts);
}

Iterator* WriteBatchWithIndex::NewIteratorWithBase(
ColumnFamilyHandle* column_family, Iterator* base_iterator,
const ReadOptions* read_options) {
return NewIteratorWithBase(nullptr, column_family, base_iterator,
read_options);
}

Iterator* WriteBatchWithIndex::NewIteratorWithBase(
DB* db, ColumnFamilyHandle* column_family, Iterator* base_iterator,
const ReadOptions* opts) {
auto wbwiii =
new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list),
&rep->write_batch, &rep->comparator);
return new BaseDeltaIterator(db, column_family, base_iterator, wbwiii,
return new BaseDeltaIterator(column_family, base_iterator, wbwiii,
GetColumnFamilyUserComparator(column_family),
opts);
read_options);
}

Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
// default column family's comparator
auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch,
&rep->comparator);
return new BaseDeltaIterator(nullptr, nullptr, base_iterator, wbwiii,
return new BaseDeltaIterator(nullptr, base_iterator, wbwiii,
rep->comparator.default_comparator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
BaseDeltaIterator::BaseDeltaIterator(DB* db, ColumnFamilyHandle* column_family,
BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family,
Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator,
Expand All @@ -33,7 +33,7 @@ BaseDeltaIterator::BaseDeltaIterator(DB* db, ColumnFamilyHandle* column_family,
comparator_(comparator),
iterate_upper_bound_(read_options ? read_options->iterate_upper_bound
: nullptr) {
wbwii_.reset(new WriteBatchWithIndexInternal(db, column_family));
wbwii_.reset(new WriteBatchWithIndexInternal(column_family));
}

void BaseDeltaIterator::SeekToFirst() {
Expand Down Expand Up @@ -596,6 +596,10 @@ bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) {
}
}

WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
ColumnFamilyHandle* column_family)
: db_(nullptr), db_options_(nullptr), column_family_(column_family) {}

WriteBatchWithIndexInternal::WriteBatchWithIndexInternal(
DB* db, ColumnFamilyHandle* column_family)
: db_(db), db_options_(nullptr), column_family_(column_family) {
Expand Down Expand Up @@ -638,9 +642,11 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
context.GetOperands(), result, logger,
statistics, clock, result_operand);
} else {
return MergeHelper::TimedFullMerge(
merge_operator, key, value, context.GetOperands(), result, nullptr,
nullptr, SystemClock::Default().get(), result_operand);
const auto cf_opts = cfh->cfd()->ioptions();
return MergeHelper::TimedFullMerge(merge_operator, key, value,
context.GetOperands(), result,
cf_opts->info_log, cf_opts->statistics,
cf_opts->clock, result_operand);
}
} else {
return Status::InvalidArgument("Must provide a column_family");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ struct Options;
// * equal_keys_ <=> base_iterator == delta_iterator
class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(DB* db, ColumnFamilyHandle* column_family,
Iterator* base_iterator, WBWIIteratorImpl* delta_iterator,
BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator,
WBWIIteratorImpl* delta_iterator,
const Comparator* comparator,
const ReadOptions* read_options = nullptr);

Expand Down Expand Up @@ -296,6 +296,8 @@ class WriteBatchWithIndexInternal {
// For GetFromBatchAndDB or similar
explicit WriteBatchWithIndexInternal(DB* db,
ColumnFamilyHandle* column_family);
// For GetFromBatchAndDB or similar
explicit WriteBatchWithIndexInternal(ColumnFamilyHandle* column_family);
// For GetFromBatch or similar
explicit WriteBatchWithIndexInternal(const DBOptions* db_options,
ColumnFamilyHandle* column_family);
Expand Down
27 changes: 0 additions & 27 deletions utilities/write_batch_with_index/write_batch_with_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1927,33 +1927,6 @@ TEST_P(WriteBatchWithIndexTest, IteratorMergeTest) {
AssertItersEqual(iter.get(), &kvi);
}

TEST_P(WriteBatchWithIndexTest, IteratorDBMergeTest) {
ASSERT_OK(OpenDB());
ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();

KVMap result = {
{"m", "m0"}, // Merge
{"mm", "mm0,mm1"}, // Merge, Merge
{"dm", "dm1"}, // Delete, Merge
{"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge
{"mdm", "mdm2"}, // Merge, Delete, Merge
{"mpm", "mpm1,mpm2"}, // Merge, Put, Merge
{"pm", "pm0,pm1"}, // Put, Merge
{"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge
};

for (auto& iter : result) {
EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second);
}

KVIter kvi(&result);
// First try just the batch
KVMap empty_map;
std::unique_ptr<Iterator> iter(
batch_->NewIteratorWithBase(db_, new KVIter(&empty_map)));
AssertItersEqual(iter.get(), &kvi);
}

TEST_P(WriteBatchWithIndexTest, IteratorMergeTestWithOrig) {
ASSERT_OK(OpenDB());
ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily();
Expand Down

0 comments on commit f2665d1

Please sign in to comment.