Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace MultiGet Keys and CF_IDs to the trace file #8421

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,16 @@ std::vector<Status> DBImpl::MultiGet(
}
#endif // NDEBUG

if (tracer_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few different MultiGet overloaded APIs. We should trace all of them.

// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
// TODO: maybe handle the tracing status?
tracer_->MultiGet(column_family, keys).PermitUncheckedError();
}
}

SequenceNumber consistent_seqnum;

std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
Expand Down Expand Up @@ -2191,6 +2201,16 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
}
#endif // NDEBUG

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
// TODO: maybe handle the tracing status?
tracer_->MultiGet(num_keys, column_families, keys).PermitUncheckedError();
}
}

autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
Expand Down Expand Up @@ -2353,6 +2373,15 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
// TODO: maybe handle the tracing status?
tracer_->MultiGet(num_keys, column_family, keys).PermitUncheckedError();
}
}
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
Expand Down
20 changes: 19 additions & 1 deletion tools/trace_analyzer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,26 @@ class TraceAnalyzerTest : public testing::Test {
ASSERT_OK(batch.SingleDelete("d"));
ASSERT_OK(batch.DeleteRange("e", "f"));
ASSERT_OK(db_->Write(wo, &batch));

std::vector<Slice> keys;
keys.push_back("a");
keys.push_back("b");
keys.push_back("df");
keys.push_back("gege");
keys.push_back("hjhjhj");
std::vector<std::string> values;
std::vector<Status> ss = db_->MultiGet(ro, keys, &values);
ASSERT_GE(ss.size(), 0);
ASSERT_OK(ss[0]);
ASSERT_NOK(ss[2]);
std::vector<ColumnFamilyHandle*> cfs(2, db_->DefaultColumnFamily());
std::vector<PinnableSlice> values2(keys.size());
db_->MultiGet(ro, 2, cfs.data(), keys.data(), values2.data(), ss.data(),
false);
ASSERT_OK(ss[0]);
db_->MultiGet(ro, db_->DefaultColumnFamily(), 2, keys.data() + 3,
values2.data(), ss.data(), false);
ASSERT_OK(db_->Get(ro, "a", &value));

single_iter = db_->NewIterator(ro);
single_iter->Seek("a");
ASSERT_OK(single_iter->status());
Expand Down
4 changes: 4 additions & 0 deletions tools/trace_analyzer_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ Status TraceAnalyzer::StartProcessing() {
fprintf(stderr, "Cannot process the iterator in the trace\n");
return s;
}
} else if (trace.type == kTraceMultiGet) {
MultiGetPayload multiget_payload;
assert(trace_file_version_ >= 2);
TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
} else if (trace.type == kTraceEnd) {
break;
}
Expand Down
112 changes: 112 additions & 0 deletions trace_replay/trace_replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,47 @@ void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
}
}

void TracerHelper::DecodeMultiGetPayload(Trace* trace,
MultiGetPayload* multiget_payload) {
assert(multiget_payload != nullptr);
Slice cfids_payload;
Slice keys_payload;
Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) {
// Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) {
case TracePayloadType::kMultiGetSize:
GetFixed32(&buf, &(multiget_payload->multiget_size));
break;
case TracePayloadType::kMultiGetCFIDs:
GetLengthPrefixedSlice(&buf, &cfids_payload);
break;
case TracePayloadType::kMultiGetKeys:
GetLengthPrefixedSlice(&buf, &keys_payload);
break;
default:
assert(false);
}
// unset the rightmost bit.
payload_map &= (payload_map - 1);
}

// Decode the cfids_payload and keys_payload
multiget_payload->cf_ids.reserve(multiget_payload->multiget_size);
multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size);
for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) {
uint32_t tmp_cfid;
Slice tmp_key;
GetFixed32(&cfids_payload, &tmp_cfid);
GetLengthPrefixedSlice(&keys_payload, &tmp_key);
multiget_payload->cf_ids.push_back(tmp_cfid);
multiget_payload->multiget_keys.push_back(tmp_key.ToString());
}
}

Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: clock_(clock),
Expand Down Expand Up @@ -302,6 +343,77 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
return WriteTrace(trace);
}

Status Tracer::MultiGet(const size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys) {
if (num_keys == 0) {
return Status::OK();
}
std::vector<ColumnFamilyHandle*> v_column_families;
std::vector<Slice> v_keys;
v_column_families.resize(num_keys);
v_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; i++) {
v_column_families[i] = column_families[i];
v_keys[i] = keys[i];
}
return MultiGet(v_column_families, v_keys);
}

Status Tracer::MultiGet(const size_t num_keys,
ColumnFamilyHandle* column_family, const Slice* keys) {
if (num_keys == 0) {
return Status::OK();
}
std::vector<ColumnFamilyHandle*> column_families;
std::vector<Slice> v_keys;
column_families.resize(num_keys);
v_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; i++) {
column_families[i] = column_family;
v_keys[i] = keys[i];
}
return MultiGet(column_families, v_keys);
}

Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Slice>& keys) {
if (column_families.size() != keys.size()) {
return Status::Corruption("the CFs size and keys size does not match!");
}
TraceType trace_type = kTraceMultiGet;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
uint32_t multiget_size = static_cast<uint32_t>(keys.size());
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
// Set the payloadmap of the struct member that will be encoded in the
// payload.
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kMultiGetSize);
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kMultiGetCFIDs);
TracerHelper::SetPayloadMap(trace.payload_map,
TracePayloadType::kMultiGetKeys);
// Encode the CFIDs inorder
std::string cfids_payload;
std::string keys_payload;
for (uint32_t i = 0; i < multiget_size; i++) {
assert(i < column_families.size());
assert(i < keys.size());
PutFixed32(&cfids_payload, column_families[i]->GetID());
PutLengthPrefixedSlice(&keys_payload, keys[i]);
}
// Encode the Get struct members into payload. Make sure add them in order.
PutFixed64(&trace.payload, trace.payload_map);
PutFixed32(&trace.payload, multiget_size);
PutLengthPrefixedSlice(&trace.payload, cfids_payload);
PutLengthPrefixedSlice(&trace.payload, keys_payload);
return WriteTrace(trace);
}

bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
if (IsTraceFileOverMax()) {
return true;
Expand Down
26 changes: 26 additions & 0 deletions trace_replay/trace_replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ enum TraceType : char {
kBlockTraceRangeDeletionBlock = 11,
// For IOTracing.
kIOTracer = 12,
// For query tracing
kTraceMultiGet = 13,
// All trace types should be added before kTraceMax
kTraceMax,
};
Expand Down Expand Up @@ -98,6 +100,9 @@ enum TracePayloadType : char {
kIterKey = 5,
kIterLowerBound = 6,
kIterUpperBound = 7,
kMultiGetSize = 8,
kMultiGetCFIDs = 9,
kMultiGetKeys = 10,
};

struct WritePayload {
Expand All @@ -116,6 +121,12 @@ struct IterPayload {
Slice upper_bound;
};

struct MultiGetPayload {
uint32_t multiget_size;
std::vector<uint32_t> cf_ids;
std::vector<std::string> multiget_keys;
};

class TracerHelper {
public:
// Parse the string with major and minor version only
Expand Down Expand Up @@ -143,6 +154,10 @@ class TracerHelper {

// Decode the iter payload and store in WrteiPayload
static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload);

// Decode the multiget payload and store in MultiGetPayload
static void DecodeMultiGetPayload(Trace* trace,
MultiGetPayload* multiget_payload);
};

// Tracer captures all RocksDB operations using a user-provided TraceWriter.
Expand All @@ -166,6 +181,17 @@ class Tracer {
Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound, const Slice upper_bound);

// Trace MultiGet

Status MultiGet(const size_t num_keys, ColumnFamilyHandle** column_families,
const Slice* keys);

Status MultiGet(const size_t num_keys, ColumnFamilyHandle* column_family,
const Slice* keys);

Status MultiGet(const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys);

// Returns true if the trace is over the configured max trace file limit.
// False otherwise.
bool IsTraceFileOverMax();
Expand Down