Skip to content

Commit

Permalink
SPDB-270: db_bench: add SeekRandomWriteRandom test
Browse files Browse the repository at this point in the history
combination of seek random and overwrite random.
all threads are doing seek and write interchangeably.
  • Loading branch information
Yuval-Ariel committed Jul 12, 2022
1 parent a4e27ce commit 303221a
Showing 1 changed file with 159 additions and 0 deletions.
159 changes: 159 additions & 0 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ DEFINE_string(
"newiterator,"
"newiteratorwhilewriting,"
"seekrandom,"
"seekrandomwriterandom,"
"seekrandomwhilewriting,"
"seekrandomwhilemerging,"
"readseq,"
Expand Down Expand Up @@ -207,6 +208,8 @@ IF_ROCKSDB_LITE("",
"\tnewiterator -- repeated iterator creation\n"
"\tseekrandom -- N random seeks, call Next seek_nexts times "
"per seek\n"
"\tseekrandomwriterandom -- N threads doing random overwrite and "
"random seek\n"
"\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
"overwrite\n"
"\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
Expand Down Expand Up @@ -3291,6 +3294,8 @@ class Benchmark {
method = &Benchmark::IteratorCreationWhileWriting;
} else if (name == "seekrandom") {
method = &Benchmark::SeekRandom;
} else if (name == "seekrandomwriterandom") {
method = &Benchmark::SeekRandomWriteRandom;
} else if (name == "seekrandomwhilewriting") {
num_threads++; // Add extra thread for writing
method = &Benchmark::SeekRandomWhileWriting;
Expand Down Expand Up @@ -6958,6 +6963,160 @@ class Benchmark {
thread->stats.AddMessage(msg);
}

// Each thread does #iterations of either seek or write
// use readwritepercent to set ratio of seek/write
// number of iterations = duration ? duration : readwrites_
// readwrites_ = max(reads_, writes) or num if zero.
// can pass: seek_nexts, reverse_iterator, max_scan_distance and
// use_tailing_iterator. seek was taken from SeekRandom and write from
// ReadRandomWriteRandom
void SeekRandomWriteRandom(ThreadState* thread) {
// Seek preparation
int64_t seeks = 0;
int64_t found = 0;
int64_t bytes = 0;
ReadOptions options(FLAGS_verify_checksum, true);
options.total_order_seek = FLAGS_total_order_seek;
options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator;
options.readahead_size = FLAGS_readahead_size;

std::unique_ptr<Iterator> single_iter;
std::vector<std::unique_ptr<Iterator>> multi_iters;
if (db_.db != nullptr) {
single_iter.reset(db_.db->NewIterator(options));
} else {
for (const auto& db_with_cfh : multi_dbs_) {
multi_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}

std::unique_ptr<const char[]> upper_bound_key_guard;
Slice upper_bound = AllocateKey(&upper_bound_key_guard);
std::unique_ptr<const char[]> lower_bound_key_guard;
Slice lower_bound = AllocateKey(&lower_bound_key_guard);

// Write preparation
RandomGenerator gen;
int64_t writes_done = 0;
Duration duration(FLAGS_duration, readwrites_);

std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);

std::unique_ptr<char[]> ts_guard;
if (user_timestamp_size_ > 0) {
ts_guard.reset(new char[user_timestamp_size_]);
}

// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
int prob_op = thread->rand.Uniform(100);

// Seek
if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readwritepercent)) {
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->GetTimestampForRead(thread->rand,
ts_guard.get());
options.timestamp = &ts;
}

int64_t seek_pos = thread->rand.Next() % FLAGS_num;
GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
&key);
if (FLAGS_max_scan_distance != 0) {
if (FLAGS_reverse_iterator) {
GenerateKeyFromInt(static_cast<uint64_t>(std::max(
static_cast<int64_t>(0),
seek_pos - FLAGS_max_scan_distance)),
FLAGS_num, &lower_bound);
options.iterate_lower_bound = &lower_bound;
} else {
auto min_num =
std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance);
GenerateKeyFromInt(static_cast<uint64_t>(min_num), FLAGS_num,
&upper_bound);
options.iterate_upper_bound = &upper_bound;
}
}

if (!FLAGS_use_tailing_iterator) {
if (db_.db != nullptr) {
single_iter.reset(db_.db->NewIterator(options));
} else {
multi_iters.clear();
for (const auto& db_with_cfh : multi_dbs_) {
multi_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}
}

// Pick an Iterator to use
Iterator* iter_to_use = single_iter.get();
if (iter_to_use == nullptr) {
iter_to_use =
multi_iters[thread->rand.Next() % multi_iters.size()].get();
}

iter_to_use->Seek(key);
seeks++;
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
}

for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
// Copy out iterator's value to make sure we read them.
bytes += iter_to_use->key().size() + iter_to_use->value().size();

if (!FLAGS_reverse_iterator) {
iter_to_use->Next();
} else {
iter_to_use->Prev();
}
assert(iter_to_use->status().ok());
}

if (thread->shared->read_rate_limiter.get() != nullptr &&
seeks % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
}

thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
// Write
} else {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);

Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
write_options_.timestamp = &ts;
}
Status s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
ErrorExit();
}
writes_done++;
thread->stats.FinishedOps(nullptr, db, 1, kWrite);
}
}

char msg[100];
snprintf(msg, sizeof(msg),
"( seeks:%" PRIu64 " writes:%" PRIu64 " found:%" PRIu64 ")", seeks,
writes_done, found);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > ROCKSDB_NAMESPACE::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
}

//
// Read-modify-write for random keys
void UpdateRandom(ThreadState* thread) {
Expand Down

0 comments on commit 303221a

Please sign in to comment.