Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PCP-21] Titan GC doesn’t affect online write #147

Merged
merged 24 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion db/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,12 @@ TEST_F(DBBlobIndexTest, Iterate) {
create_blob_iterator, check_is_blob(false));
verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
create_blob_iterator, check_is_blob(false));
verify(11, Status::kNotSupported, "", "", create_blob_iterator);
verify(11, Status::kOk,
get_value(11, 3) + ", " + get_value(11, 2) + "," + get_value(11, 1) +
"," + get_value(11, 0),
get_value(11, 3) + ", " + get_value(11, 2) + "," + get_value(11, 1) +
"," + get_value(11, 0),
create_blob_iterator);
verify(13, Status::kOk,
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
Expand Down
168 changes: 114 additions & 54 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ bool DBIter::MergeValuesNewToOld() {

ParsedInternalKey ikey;
Status s;
ValueType base_type = kTypeMerge;
for (iter_.Next(); iter_.Valid(); iter_.Next()) {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
if (!ParseKey(&ikey)) {
Expand All @@ -699,14 +700,24 @@ bool DBIter::MergeValuesNewToOld() {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_.Next();
base_type = kTypeDeletion;
break;
} else if (kTypeValue == ikey.type) {
} else if (kTypeValue == ikey.type || kTypeBlobIndex == ikey.type) {
if (kTypeBlobIndex == ikey.type && !allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
const Slice val = iter_.value();
s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &pinned_value_, true);
s = MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, ikey.type,
&val, merge_context_.GetOperands(),
&ikey.type, &saved_value_, logger_,
statistics_, env_, &pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
Expand All @@ -718,25 +729,24 @@ bool DBIter::MergeValuesNewToOld() {
valid_ = false;
return false;
}
if (ikey.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index from merge.");
status_ = Status::NotSupported(
"Encounter unexpected blob index from merge. Please open DB "
"with rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
is_blob_ = true;
}
return true;
} else if (kTypeMerge == ikey.type) {
// hit a merge, add the value as an operand and run associative merge.
// when complete, add result to operands and continue.
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
return false;
} else {
assert(false);
}
Expand All @@ -751,15 +761,26 @@ bool DBIter::MergeValuesNewToOld() {
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), base_type, nullptr,
merge_context_.GetOperands(), &base_type, &saved_value_, logger_,
statistics_, env_, &pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
if (base_type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index from merge.");
status_ = Status::NotSupported(
"Encounter unexpected blob index from merge. Please open DB "
"with rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
is_blob_ = true;
}

assert(status_.ok());
return true;
Expand Down Expand Up @@ -1016,27 +1037,46 @@ bool DBIter::FindValueForCurrentKey() {
last_not_merge_type == kTypeSingleDeletion ||
last_not_merge_type == kTypeRangeDeletion) {
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_, true);
} else if (last_not_merge_type == kTypeBlobIndex) {
if (!allow_blob_) {
merge_operator_, saved_key_.GetUserKey(), last_not_merge_type,
nullptr, merge_context_.GetOperands(), &last_not_merge_type,
&saved_value_, logger_, statistics_, env_, &pinned_value_, true);
if (last_not_merge_type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index from merge.");
status_ = Status::NotSupported(
"Encounter unexpected blob index from merge. Please open DB "
"with rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
is_blob_ = true;
}
} else {
assert(last_not_merge_type == kTypeValue ||
last_not_merge_type == kTypeBlobIndex);
if (last_not_merge_type == kTypeBlobIndex && !allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
valid_ = false;
return false;
}
valid_ = false;
return false;
} else {
assert(last_not_merge_type == kTypeValue);
s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_, true);
merge_operator_, saved_key_.GetUserKey(), last_not_merge_type,
&pinned_value_, merge_context_.GetOperands(), &last_not_merge_type,
&saved_value_, logger_, statistics_, env_, &pinned_value_, true);
if (last_not_merge_type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index from merge.");
status_ = Status::NotSupported(
"Encounter unexpected blob index from merge. Please open DB "
"with rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
is_blob_ = true;
}
}
break;
case kTypeValue:
Expand Down Expand Up @@ -1137,6 +1177,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
merge_context_.Clear();
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
ValueType base_type = kTypeMerge;
while (true) {
iter_.Next();

Expand All @@ -1157,50 +1198,69 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal)) {
base_type = kTypeDeletion;
break;
} else if (ikey.type == kTypeValue) {
} else if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
if (ikey.type == kTypeBlobIndex && !allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
const Slice val = iter_.value();
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &val,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_, true);
merge_operator_, saved_key_.GetUserKey(), ikey.type, &val,
merge_context_.GetOperands(), &ikey.type, &saved_value_, logger_,
statistics_, env_, &pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
if (ikey.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index from merge.");
status_ = Status::NotSupported(
"Encounter unexpected blob index from merge. Please open DB "
"with rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
is_blob_ = true;
}
valid_ = true;
return true;
} else if (ikey.type == kTypeMerge) {
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
return false;
} else {
assert(false);
}
}

Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
merge_operator_, saved_key_.GetUserKey(), base_type, nullptr,
merge_context_.GetOperands(), &base_type, &saved_value_, logger_,
statistics_, env_, &pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;
return false;
}
if (base_type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index from merge.");
status_ = Status::NotSupported(
"Encounter unexpected blob index from merge. Please open DB "
"with rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
is_blob_ = true;
}

// Make sure we leave iter_ in a good state. If it's valid and we don't care
// about prefixes, that's already good enough. Otherwise it needs to be
Expand Down
28 changes: 17 additions & 11 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,13 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = Status::NotSupported(
"Encounter unsupported blob value. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else if (*(s->merge_in_progress)) {
*(s->status) =
Status::NotSupported("Blob DB does not support merge operator.");
}
if (!s->status->ok()) {
*(s->found_final_value) = true;
return false;
}
FALLTHROUGH_INTENDED;
// continue as normal value
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
Expand All @@ -679,8 +677,8 @@ static bool SaveValue(void* arg, const char* entry) {
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
merge_operator, s->key->user_key(), type, &v,
merge_context->GetOperands(), &type, s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else if (s->value != nullptr) {
Expand All @@ -701,14 +699,17 @@ static bool SaveValue(void* arg, const char* entry) {
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger,
merge_operator, s->key->user_key(), type, nullptr,
merge_context->GetOperands(), &type, s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else {
*(s->status) = Status::NotFound();
}
*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
}
return false;
}
case kTypeMerge: {
Expand All @@ -726,12 +727,17 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->merge_in_progress) = true;
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
if (merge_operator->ShouldMerge(merge_context->GetOperandsDirectionBackward())) {
if (merge_operator->ShouldMerge(
merge_context->GetOperandsDirectionBackward())) {
// only if `ShouldMerge` suggests a proactive partial merge
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
s->env_, nullptr /* result_operand */, true);
merge_operator, s->key->user_key(), type, nullptr,
merge_context->GetOperands(), &type, s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
}
return false;
}
return true;
Expand Down
Loading