Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New API to get all merge operands for a Key #5604

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5e20ad0
V1
vjnadimpalli Jul 16, 2019
acabd73
Using PinnableSlice*
vjnadimpalli Jul 18, 2019
471fac2
modified version_set
vjnadimpalli Jul 19, 2019
6124903
reusing the existing methods in db_impl and version_set
vjnadimpalli Jul 19, 2019
a26bf66
refactored to use existing methods in code
vjnadimpalli Jul 22, 2019
ffc9c49
make clang happy
vjnadimpalli Jul 22, 2019
9219313
removed std::cout
vjnadimpalli Jul 22, 2019
6b638fb
Added unit test and perf test
vjnadimpalli Jul 26, 2019
01906be
Added MergeOperandsInfo struct
vjnadimpalli Jul 26, 2019
e399483
Added perf test and changes based on comments
vjnadimpalli Jul 29, 2019
e1b1109
comments and minor changes
vjnadimpalli Jul 29, 2019
7dc5454
more changes
vjnadimpalli Jul 30, 2019
9a88f75
Added structs
vjnadimpalli Jul 31, 2019
060a191
db_bench works
vjnadimpalli Aug 1, 2019
fd32d4f
more comments and other minor changes
vjnadimpalli Aug 1, 2019
a67a35c
comments and refactoring
vjnadimpalli Aug 1, 2019
bef8408
Fixed an issue where a Put followed by GetMergeOperands was not retur…
vjnadimpalli Aug 1, 2019
4c60537
added more description for benchmark and sortlist
vjnadimpalli Aug 2, 2019
dc31860
changed GetImplOptions constructor so each property can be explicitly…
vjnadimpalli Aug 2, 2019
f51c0de
moved constants out of GetImplOptions
vjnadimpalli Aug 5, 2019
d8d8426
Rebase
vjnadimpalli Aug 5, 2019
6ea2435
Space for Tab, spell correction & removed line added for debugging
vjnadimpalli Aug 6, 2019
7da40c2
fixed bug in TARGET file and general formatting
vjnadimpalli Aug 6, 2019
f7b701b
formatting for fblint
vjnadimpalli Aug 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ set(SOURCES
utilities/merge_operators/bytesxor.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
utilities/merge_operators/sortlist.cc
utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc
Expand Down Expand Up @@ -904,6 +905,7 @@ if(WITH_TESTS)
db/db_log_iter_test.cc
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_merge_operand_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ TESTS = \
db_iterator_test \
db_memtable_test \
db_merge_operator_test \
db_merge_operand_test \
db_options_test \
db_range_del_test \
db_secondary_test \
Expand Down Expand Up @@ -1252,6 +1253,9 @@ db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA
db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ cpp_library(
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/merge_operators/sortlist.cc",
"utilities/merge_operators/string_append/stringappend.cc",
"utilities/merge_operators/string_append/stringappend2.cc",
"utilities/merge_operators/uint64add.cc",
Expand Down Expand Up @@ -755,6 +756,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_merge_operand_test",
"db/db_merge_operand_test.cc",
"parallel",
[],
[],
],
[
"db_options_test",
"db/db_options_test.cc",
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ build:
test:

test_script:
- ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test -Concurrency 8
- ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test,db_merge_operand_test -Concurrency 8

on_failure:
- cmd: 7z a build-failed.zip %APPVEYOR_BUILD_FOLDER%\build\ && appveyor PushArtifact build-failed.zip
Expand Down
4 changes: 2 additions & 2 deletions db/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, nullptr);
true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr);
Expand Down Expand Up @@ -70,7 +70,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr);
value.assign(pinnable_val.data(), pinnable_val.size());
Expand Down
8 changes: 5 additions & 3 deletions db/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ class DBBlobIndexTest : public DBTestBase {
ReadOptions read_options;
read_options.snapshot = snapshot;
PinnableSlice value;
auto s = dbfull()->GetImpl(read_options, cfh(), key, &value,
nullptr /*value_found*/, nullptr /*callback*/,
is_blob_index);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cfh();
get_impl_options.value = &value;
get_impl_options.is_blob_index = is_blob_index;
auto s = dbfull()->GetImpl(read_options, key, get_impl_options);
if (s.IsNotFound()) {
return "NOT_FOUND";
}
Expand Down
112 changes: 81 additions & 31 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1441,27 +1441,30 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value);
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
return GetImpl(read_options, key, get_impl_options);
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
assert(pinnable_val != nullptr);
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);

auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
auto cfd = cfh->cfd();

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
tracer_->Get(get_impl_options.column_family, key);
}
}

Expand All @@ -1473,9 +1476,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,

SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (callback) {
if (get_impl_options.callback) {
// Already calculated based on read_options.snapshot
snapshot = callback->max_visible_seq();
snapshot = get_impl_options.callback->max_visible_seq();
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
Expand All @@ -1489,12 +1492,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
if (get_impl_options.callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(snapshot);
get_impl_options.callback->Refresh(snapshot);

// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
Expand All @@ -1505,7 +1508,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = callback->max_visible_seq();
snapshot = get_impl_options.callback->max_visible_seq();
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
Expand All @@ -1526,19 +1529,39 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
// Get value associated with key
if (get_impl_options.get_value) {
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
} else {
// Get Merge Operands associated with key, Merge Operands should not be
// merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options, nullptr,
nullptr, false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, &s, &merge_context,
&max_covering_tombstone_seq,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
Expand All @@ -1547,9 +1570,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, value_found, nullptr, nullptr,
callback, is_blob_index);
sv->current->Get(
read_options, lkey, get_impl_options.value, &s, &merge_context,
&max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
RecordTick(stats_, MEMTABLE_MISS);
}

Expand All @@ -1561,7 +1589,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
size = pinnable_val->size();
if (get_impl_options.get_value) {
size = get_impl_options.value->size();
} else {
// Return all merge operands for get_impl_options.key
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
if (*get_impl_options.number_of_operands >
get_impl_options.get_merge_operands_options
->expected_max_number_of_operands) {
s = Status::Incomplete(
Status::SubCode::KMergeOperandsInsufficientCapacity);
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
Expand Down Expand Up @@ -2222,7 +2268,11 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
ReadOptions roptions = read_options;
roptions.read_tier = kBlockCacheTier; // read from block cache only
PinnableSlice pinnable_val;
auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found);
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = value_found;
auto s = GetImpl(roptions, key, get_impl_options);
value->assign(pinnable_val.data(), pinnable_val.size());

// If block_cache is enabled and the index block of the table didn't
Expand Down
43 changes: 39 additions & 4 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;

using DB::GetMergeOperands;
Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* merge_operands,
GetMergeOperandsOptions* get_merge_operands_options,
int* number_of_operands) override {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.merge_operands = merge_operands;
get_impl_options.get_merge_operands_options = get_merge_operands_options;
get_impl_options.number_of_operands = number_of_operands;
get_impl_options.get_value = false;
return GetImpl(options, key, get_impl_options);
}

using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
Expand Down Expand Up @@ -395,12 +410,32 @@ class DBImpl : public DB {

// ---- End of implementations of the DB interface ----

struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr;
PinnableSlice* value = nullptr;
bool* value_found = nullptr;
ReadCallback* callback = nullptr;
bool* is_blob_index = nullptr;
// If true return value associated with key via value pointer else return
// all merge operands for key via merge_operands pointer
bool get_value = true;
// Pointer to an array of size
// get_merge_operands_options.expected_max_number_of_operands allocated by
// user
PinnableSlice* merge_operands = nullptr;
GetMergeOperandsOptions* get_merge_operands_options = nullptr;
int* number_of_operands = nullptr;
};

// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
// This function is also called by GetMergeOperands
// If get_impl_options.get_value = true get value associated with
// get_impl_options.key via get_impl_options.value
// If get_impl_options.get_value = false get merge operands associated with
// get_impl_options.key via get_impl_options.merge_operands
Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions get_impl_options);

ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// We may ignore the dbname when generating the file names.
for (auto& file : state.sst_delete_files) {
candidate_files.emplace_back(
MakeTableFileName(file.metadata->fd.GetNumber()),
file.path);
MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
if (file.metadata->table_reader_handle) {
table_cache_->Release(file.metadata->table_reader_handle);
}
Expand Down
Loading