Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db bench enhancements (SPDB-270) #6

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ DEFINE_string(
"newiterator,"
"newiteratorwhilewriting,"
"seekrandom,"
"seekrandomwriterandom,"
"seekrandomwhilewriting,"
"seekrandomwhilemerging,"
"readseq,"
Expand Down Expand Up @@ -207,6 +208,8 @@ IF_ROCKSDB_LITE("",
"\tnewiterator -- repeated iterator creation\n"
"\tseekrandom -- N random seeks, call Next seek_nexts times "
"per seek\n"
"\tseekrandomwriterandom -- N threads doing random overwrite and "
"random seek\n"
"\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
"overwrite\n"
"\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
Expand Down Expand Up @@ -3276,6 +3279,8 @@ class Benchmark {
method = &Benchmark::IteratorCreationWhileWriting;
} else if (name == "seekrandom") {
method = &Benchmark::SeekRandom;
} else if (name == "seekrandomwriterandom") {
method = &Benchmark::SeekRandomWriteRandom;
} else if (name == "seekrandomwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::SeekRandomWhileWriting;
Expand Down Expand Up @@ -6932,6 +6937,160 @@ class Benchmark {
thread->stats.AddMessage(msg);
}

// Each thread does #iterations of either seek or write
// use readwritepercent to set ratio of seek/write
// number of iterations = duration ? duration : readwrites_
// readwrites_ = max(reads_, writes) or num if zero.
// can pass: seek_nexts, reverse_iterator, max_scan_distance and
// use_tailing_iterator. seek was taken from SeekRandom and write from
// ReadRandomWriteRandom
void SeekRandomWriteRandom(ThreadState* thread) {
// Seek preparation
int64_t seeks = 0;
int64_t found = 0;
int64_t bytes = 0;
ReadOptions options(FLAGS_verify_checksum, true);
options.total_order_seek = FLAGS_total_order_seek;
options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator;
options.readahead_size = FLAGS_readahead_size;

std::unique_ptr<Iterator> single_iter;
std::vector<std::unique_ptr<Iterator>> multi_iters;
if (db_.db != nullptr) {
single_iter.reset(db_.db->NewIterator(options));
} else {
for (const auto& db_with_cfh : multi_dbs_) {
multi_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}

std::unique_ptr<const char[]> upper_bound_key_guard;
Slice upper_bound = AllocateKey(&upper_bound_key_guard);
std::unique_ptr<const char[]> lower_bound_key_guard;
Slice lower_bound = AllocateKey(&lower_bound_key_guard);

// Write preparation
RandomGenerator gen;
int64_t writes_done = 0;
Duration duration(FLAGS_duration, readwrites_);

std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);

std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}

// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
int prob_op = thread->rand.Uniform(100);

// Seek
if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readwritepercent)) {
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand,
ts_guard.get());
options.timestamp = &ts;
}

int64_t seek_pos = thread->rand.Next() % FLAGS_num;
GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
&key);
if (FLAGS_max_scan_distance != 0) {
if (FLAGS_reverse_iterator) {
GenerateKeyFromInt(static_cast<uint64_t>(std::max(
static_cast<int64_t>(0),
seek_pos - FLAGS_max_scan_distance)),
FLAGS_num, &lower_bound);
options.iterate_lower_bound = &lower_bound;
} else {
auto min_num =
std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance);
GenerateKeyFromInt(static_cast<uint64_t>(min_num), FLAGS_num,
&upper_bound);
options.iterate_upper_bound = &upper_bound;
}
}

if (!FLAGS_use_tailing_iterator) {
if (db_.db != nullptr) {
single_iter.reset(db_.db->NewIterator(options));
} else {
multi_iters.clear();
for (const auto& db_with_cfh : multi_dbs_) {
multi_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}
}

// Pick an Iterator to use
Iterator* iter_to_use = single_iter.get();
if (iter_to_use == nullptr) {
iter_to_use =
multi_iters[thread->rand.Next() % multi_iters.size()].get();
}

iter_to_use->Seek(key);
seeks++;
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
}

for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
// Copy out iterator's value to make sure we read them.
bytes += iter_to_use->key().size() + iter_to_use->value().size();

if (!FLAGS_reverse_iterator) {
iter_to_use->Next();
} else {
iter_to_use->Prev();
}
assert(iter_to_use->status().ok());
}

if (thread->shared->read_rate_limiter.get() != nullptr &&
seeks % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
}

thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
// Write
} else {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);

Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
Status s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
ErrorExit();
}
writes_done++;
thread->stats.FinishedOps(nullptr, db, 1, kWrite);
}
}

char msg[100];
snprintf(msg, sizeof(msg),
"( seeks:%" PRIu64 " writes:%" PRIu64 " found:%" PRIu64 ")", seeks,
writes_done, found);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
}

//
// Read-modify-write for random keys
void UpdateRandom(ThreadState* thread) {
Expand Down