Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add tracing and analyzing for iterator next/prev count #4561

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
32 changes: 29 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,17 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

if(tracer_.get() == nullptr) {
InstrumentedMutexLock lock(&trace_mutex_);
if(tracer_.get() == nullptr) {
std::string trace_filename = "/home/zhichao/trace/trace." + std::to_string(env_->NowMicros());
EnvOptions env_opts;
std::unique_ptr<TraceWriter> trace_writer;
NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer);
tracer_.reset(new Tracer(env_, std::move(trace_writer)));
}
}

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
Expand Down Expand Up @@ -3250,29 +3261,44 @@ Status DBImpl::EndTrace() {
return s;
}

Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id,
const uint64_t& trace_iter_uid,
const Slice& key) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorSeek(cf_id, key);
s = tracer_->IteratorSeek(cf_id, trace_iter_uid, key);
}
}
return s;
}

Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
const uint64_t& trace_iter_uid,
const Slice& key) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorSeekForPrev(cf_id, key);
s = tracer_->IteratorSeekForPrev(cf_id, trace_iter_uid, key);
}
}
return s;
}

Status DBImpl::TraceIteratorIterCount(const uint32_t& cf_id,
const uint64_t& trace_iter_uid,
const uint64_t& count) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorIterCount(cf_id, trace_iter_uid, count);
}
}
return s;
}
#endif // ROCKSDB_LITE

} // namespace rocksdb
10 changes: 8 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,14 @@ class DBImpl : public DB {

using DB::EndTrace;
virtual Status EndTrace() override;
Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key);
Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
Status TraceIteratorSeek(const uint32_t& cf_id,
const uint64_t& trace_iter_uid, const Slice& key);
Status TraceIteratorSeekForPrev(const uint32_t& cf_id,
const uint64_t& trace_iter_uid,
const Slice& key);
Status TraceIteratorIterCount(const uint32_t& cf_id,
const uint64_t& trace_iter_uid,
const uint64_t& count);
#endif // ROCKSDB_LITE

// Similar to GetSnapshot(), but also lets the db know that this snapshot
Expand Down
12 changes: 12 additions & 0 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
SequenceNumber snapshot = versions_->LastSequence();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

if(tracer_.get() == nullptr) {
InstrumentedMutexLock lock(&trace_mutex_);
if(tracer_.get() == nullptr) {
std::string trace_filename = "/home/zhichao/trace/trace." + std::to_string(env_->NowMicros());
EnvOptions env_opts;
std::unique_ptr<TraceWriter> trace_writer;
NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer);
tracer_.reset(new Tracer(env_, std::move(trace_writer)));
}
}

if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
Expand Down
12 changes: 12 additions & 0 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}

if(tracer_.get() == nullptr) {
InstrumentedMutexLock lock(&trace_mutex_);
if(tracer_.get() == nullptr) {
std::string trace_filename = "/home/zhichao/trace/trace." + std::to_string(env_->NowMicros());
EnvOptions env_opts;
std::unique_ptr<TraceWriter> trace_writer;
NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer);
tracer_.reset(new Tracer(env_, std::move(trace_writer)));
}
}

if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
Expand Down
88 changes: 86 additions & 2 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class DBIter final: public Iterator {
read_callback_(read_callback),
db_impl_(db_impl),
cfd_(cfd),
trace_iter_uid_(0),
last_next_count_(0),
last_prev_count_(0),
allow_blob_(allow_blob),
is_blob_(false),
start_seqnum_(read_options.iter_start_seqnum) {
Expand All @@ -158,6 +161,15 @@ class DBIter final: public Iterator {
if (pinned_iters_mgr_.PinningEnabled()) {
pinned_iters_mgr_.ReleasePinnedData();
}

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr && trace_iter_uid_ != 0) {
uint64_t cur_iter_count = GetIterCount();
db_impl_->TraceIteratorIterCount(cfd_->GetID(), trace_iter_uid_,
cur_iter_count);
}
#endif // ROCKSDB_LITE

// Compiler warning issue filed:
// https://github.com/facebook/rocksdb/issues/3013
RecordTick(statistics_, NO_ITERATORS, uint64_t(-1));
Expand Down Expand Up @@ -276,6 +288,12 @@ class DBIter final: public Iterator {
// have a higher sequence number.
inline SequenceNumber MaxVisibleSequenceNumber();

// Count the number of Next and Prev that are called in previous
// Seek or SeekForPrev
uint64_t GetIterCount();
// Rest the trace_iter_uid_ for a new one
void ResetIterUid();

// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
void TempPinData() {
Expand Down Expand Up @@ -350,6 +368,11 @@ class DBIter final: public Iterator {
ReadCallback* read_callback_;
DBImpl* db_impl_;
ColumnFamilyData* cfd_;
// to identify the Seek or SeekForProv() in an iterator.
// timestamp+iter_ pointer as the value
uint64_t trace_iter_uid_;
uint64_t last_next_count_;
uint64_t last_prev_count_;
bool allow_blob_;
bool is_blob_;
// for diff snapshots we want the lower bound on the seqnum;
Expand Down Expand Up @@ -395,6 +418,12 @@ void DBIter::Next() {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
last_next_count_++;
}
#endif //ROCSDBlite

if (statistics_ != nullptr) {
local_stats_.next_count_++;
}
Expand Down Expand Up @@ -745,6 +774,13 @@ void DBIter::Prev() {
if (ok) {
PrevInternal();
}

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
last_prev_count_++;
}
#endif //ROCSDBlite

if (statistics_ != nullptr) {
local_stats_.prev_count_++;
if (valid_) {
Expand Down Expand Up @@ -1263,6 +1299,19 @@ SequenceNumber DBIter::MaxVisibleSequenceNumber() {
return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber());
}

uint64_t DBIter::GetIterCount() {
uint64_t ret;
ret = last_next_count_ + last_prev_count_;
last_next_count_ = 0;
last_prev_count_ = 0;
return ret;
}

void DBIter::ResetIterUid() {
trace_iter_uid_ = env_->NowMicros();
trace_iter_uid_ += reinterpret_cast<uintptr_t>(iter_);
}

void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
status_ = Status::OK();
Expand All @@ -1275,7 +1324,13 @@ void DBIter::Seek(const Slice& target) {

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
if (trace_iter_uid_ != 0) {
uint64_t cur_iter_count = GetIterCount();
db_impl_->TraceIteratorIterCount(cfd_->GetID(), trace_iter_uid_,
cur_iter_count);
}
ResetIterUid();
db_impl_->TraceIteratorSeek(cfd_->GetID(), trace_iter_uid_, target);
}
#endif // ROCKSDB_LITE

Expand Down Expand Up @@ -1345,7 +1400,13 @@ void DBIter::SeekForPrev(const Slice& target) {

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
if (trace_iter_uid_ != 0) {
uint64_t cur_iter_count = GetIterCount();
db_impl_->TraceIteratorIterCount(cfd_->GetID(), trace_iter_uid_,
cur_iter_count);
}
ResetIterUid();
db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), trace_iter_uid_, target);
}
#endif // ROCKSDB_LITE

Expand Down Expand Up @@ -1398,6 +1459,17 @@ void DBIter::SeekToFirst() {
range_del_agg_.InvalidateRangeDelMapPositions();
}

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
if (trace_iter_uid_ != 0) {
uint64_t cur_iter_count = GetIterCount();
db_impl_->TraceIteratorIterCount(cfd_->GetID(), trace_iter_uid_,
cur_iter_count);
}
trace_iter_uid_ = 0;
}
#endif // ROCKSDB_LITE

RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) {
saved_key_.SetUserKey(
Expand Down Expand Up @@ -1448,6 +1520,18 @@ void DBIter::SeekToLast() {
iter_->SeekToLast();
range_del_agg_.InvalidateRangeDelMapPositions();
}

#ifndef ROCKSDB_LITE
if (db_impl_ != nullptr && cfd_ != nullptr) {
if (trace_iter_uid_ != 0) {
uint64_t cur_iter_count = GetIterCount();
db_impl_->TraceIteratorIterCount(cfd_->GetID(), trace_iter_uid_,
cur_iter_count);
}
trace_iter_uid_ = 0;
}
#endif // ROCKSDB_LITE

PrevInternal();
if (statistics_ != nullptr) {
RecordTick(statistics_, NUMBER_DB_SEEK);
Expand Down
19 changes: 5 additions & 14 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,11 +640,9 @@ DEFINE_bool(optimize_filters_for_hits, false,
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
"Ignored. Left here for backward compatibility");

DEFINE_int64(writes_before_delete_range, 0,
"Number of writes before DeleteRange is called regularly.");

DEFINE_int64(writes_per_range_tombstone, 0,
"Number of writes between range tombstones");
"Number of writes between range "
"tombstones");

DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");

Expand Down Expand Up @@ -1970,7 +1968,6 @@ class Benchmark {
int prefix_size_;
int64_t keys_per_prefix_;
int64_t entries_per_batch_;
int64_t writes_before_delete_range_;
int64_t writes_per_range_tombstone_;
int64_t range_tombstone_width_;
int64_t max_num_range_tombstones_;
Expand Down Expand Up @@ -2498,7 +2495,6 @@ void VerifyDBFromDB(std::string& truth_db_name) {
value_size_ = FLAGS_value_size;
key_size_ = FLAGS_key_size;
entries_per_batch_ = FLAGS_batch_size;
writes_before_delete_range_ = FLAGS_writes_before_delete_range;
writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
range_tombstone_width_ = FLAGS_range_tombstone_width;
max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
Expand Down Expand Up @@ -2853,7 +2849,6 @@ void VerifyDBFromDB(std::string& truth_db_name) {
}

SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
perf_context.EnablePerLevelPerfContext();
thread->stats.Start(thread->tid);
(arg->bm->*(arg->method))(thread);
thread->stats.Stop();
Expand Down Expand Up @@ -3880,13 +3875,9 @@ void VerifyDBFromDB(std::string& truth_db_name) {
bytes += value_size_ + key_size_;
++num_written;
if (writes_per_range_tombstone_ > 0 &&
num_written > writes_before_delete_range_ &&
(num_written - writes_before_delete_range_) /
writes_per_range_tombstone_ <=
num_written / writes_per_range_tombstone_ <=
max_num_range_tombstones_ &&
(num_written - writes_before_delete_range_) %
writes_per_range_tombstone_ ==
0) {
num_written % writes_per_range_tombstone_ == 0) {
int64_t begin_num = key_gens[id]->Next();
if (FLAGS_expand_range_tombstones) {
for (int64_t offset = 0; offset < range_tombstone_width_;
Expand Down Expand Up @@ -4237,7 +4228,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
}
if (levelMeta.level == 0) {
for (auto& fileMeta : levelMeta.files) {
fprintf(stdout, "Level[%d]: %s(size: %" ROCKSDB_PRIszt " bytes)\n",
fprintf(stdout, "Level[%d]: %s(size: %" PRIu64 " bytes)\n",
levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
}
} else {
Expand Down
7 changes: 7 additions & 0 deletions tools/trace_analyzer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ class TraceAnalyzerTest : public testing::Test {
ASSERT_OK(db_->Get(ro, "a", &value));
single_iter = db_->NewIterator(ro);
single_iter->Seek("a");
single_iter->Next();
single_iter->Prev();
single_iter->Next();
single_iter->Prev();
single_iter->Next();
single_iter->Prev();
single_iter->SeekForPrev("b");
single_iter->Prev();
delete single_iter;
std::this_thread::sleep_for (std::chrono::seconds(1));

Expand Down
Loading