Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Merge Operator support to WriteBatchWithIndex #8135

Closed
wants to merge 12 commits into from
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* `CompactFiles()` can no longer compact files from lower level to up level, which has the risk to corrupt DB (details: #8063). The validation is also added to all compactions.
* Fixed some cases in which DB::OpenForReadOnly() could write to the filesystem. If you want a Logger with a read-only DB, you must now set DBOptions::info_log yourself, such as using CreateLoggerFromOptions().
* get_iostats_context() will never return nullptr. If thread-local support is not available, and user does not opt-out iostats context, then compilation will fail. The same applies to perf context as well.
* Added support for WriteBatchWithIndex::NewIteratorWithBase when overwrite_key=false. Previously, this combination was not supported and would assert or return nullptr.
* Improve the behavior of WriteBatchWithIndex for Merge operations. Now more operations may be stored in order to return the correct merged result.

### Bug Fixes
* Use thread-safe `strerror_r()` to get error messages.
Expand Down
28 changes: 20 additions & 8 deletions db/merge_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,29 @@ class MergeContext {
}

// Get the operand at the index.
Slice GetOperand(int index) {
Slice GetOperand(int index) const {
assert(operand_list_);

SetDirectionForward();
return (*operand_list_)[index];
}

// Same as GetOperandsDirectionForward
const std::vector<Slice>& GetOperands() {
//
// Note that the returned reference is only good until another call
// to this MergeContext. If the returned value is needed for longer,
// a copy must be made.
const std::vector<Slice>& GetOperands() const {
return GetOperandsDirectionForward();
}

// Return all the operands in the order as they were merged (passed to
// FullMerge or FullMergeV2)
const std::vector<Slice>& GetOperandsDirectionForward() {
//
// Note that the returned reference is only good until another call
// to this MergeContext. If the returned value is needed for longer,
// a copy must be made.
const std::vector<Slice>& GetOperandsDirectionForward() const {
mrambacher marked this conversation as resolved.
Show resolved Hide resolved
if (!operand_list_) {
return empty_operand_list;
}
Expand All @@ -93,7 +101,11 @@ class MergeContext {

// Return all the operands in the reversed order relative to how they were
// merged (passed to FullMerge or FullMergeV2)
const std::vector<Slice>& GetOperandsDirectionBackward() {
//
// Note that the returned reference is only good until another call
// to this MergeContext. If the returned value is needed for longer,
// a copy must be made.
const std::vector<Slice>& GetOperandsDirectionBackward() const {
if (!operand_list_) {
return empty_operand_list;
}
Expand All @@ -110,25 +122,25 @@ class MergeContext {
}
}

void SetDirectionForward() {
void SetDirectionForward() const {
if (operands_reversed_ == true) {
std::reverse(operand_list_->begin(), operand_list_->end());
operands_reversed_ = false;
}
}

void SetDirectionBackward() {
void SetDirectionBackward() const {
if (operands_reversed_ == false) {
std::reverse(operand_list_->begin(), operand_list_->end());
operands_reversed_ = true;
}
}

// List of operands
std::unique_ptr<std::vector<Slice>> operand_list_;
mutable std::unique_ptr<std::vector<Slice>> operand_list_;
// Copy of operands that are not pinned.
std::unique_ptr<std::vector<std::unique_ptr<std::string>>> copied_operands_;
bool operands_reversed_ = true;
mutable bool operands_reversed_ = true;
};

} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion utilities/transactions/transaction_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);

return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
&read_options);
}

Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
Expand Down
9 changes: 6 additions & 3 deletions utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2814,7 +2814,8 @@ TEST_P(TransactionTest, MultiGetBatchedTest) {
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_TRUE(statuses[2].ok());
ASSERT_EQ(values[2], "val3_new");
ASSERT_TRUE(statuses[3].IsMergeInProgress());
ASSERT_TRUE(statuses[3].ok());
ASSERT_EQ(values[3], "foo,bar");
ASSERT_TRUE(statuses[4].ok());
ASSERT_EQ(values[4], "val5");
ASSERT_TRUE(statuses[5].ok());
Expand Down Expand Up @@ -4839,7 +4840,8 @@ TEST_P(TransactionTest, MergeTest) {
ASSERT_OK(s);

s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsMergeInProgress());
ASSERT_OK(s);
ASSERT_EQ("a0,1,2", value);

s = txn->Put("A", "a");
ASSERT_OK(s);
Expand All @@ -4852,7 +4854,8 @@ TEST_P(TransactionTest, MergeTest) {
ASSERT_OK(s);

s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsMergeInProgress());
ASSERT_OK(s);
ASSERT_EQ("a,3", value);

TransactionOptions txn_options;
txn_options.lock_timeout = 1; // 1 ms
Expand Down
Loading