Skip to content

Commit

Permalink
db_bench: align used tests (#325)
Browse files Browse the repository at this point in the history
make sure all the tests used:
1. are working on all cfs if theres more than 1
2. use one rand num to get the db and cf and the key which makes it possible to find a key in a cf later on.
3. have the ability for read and write limiter wherever relevant.

tests:
seekrandomwriterandom, BGWriter, readrandom, seekrandom and readrandomwriterandom
  • Loading branch information
Yuval-Ariel authored and ayulas committed Feb 26, 2023
1 parent d90abec commit 5cf8eec
Showing 1 changed file with 102 additions and 66 deletions.
168 changes: 102 additions & 66 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2923,6 +2923,32 @@ class Benchmark {
std::mutex mutex_;
bool seek_started_;

inline void LimitReadOrWriteRate(RateLimiter::OpType op_type,
ThreadState* thread,
int64_t bytes_to_request) {
RateLimiter* rate_limiter_to_use;
switch (op_type) {
case RateLimiter::OpType::kRead: {
rate_limiter_to_use = thread->shared->read_rate_limiter.get();
break;
}
case RateLimiter::OpType::kWrite: {
rate_limiter_to_use = thread->shared->write_rate_limiter.get();
break;
}
default:
assert(false);
}
if (rate_limiter_to_use != nullptr) {
rate_limiter_to_use->Request(bytes_to_request, Env::IO_HIGH,
nullptr /* stats */, op_type);
// Set time at which last op finished to Now() to hide latency and
// sleep from rate limiter. Also, do the check once per batch, not
// once per write.
thread->stats.ResetLastOpTime();
}
}

// Use this to access the DB when context requires a Single-DB mode
DBWithColumnFamilies& SingleDb() {
if (IsSingleDb() == false) {
Expand Down Expand Up @@ -6361,7 +6387,6 @@ class Benchmark {

Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
// We use same key_rand as seed for key and column family so that we can
// deterministically find the cfh corresponding to a particular key, as it
// is done in DoWrite method.
Expand All @@ -6379,6 +6404,7 @@ class Benchmark {
} else {
key_rand = GetRandomKey(&thread->rand);
}
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(key_rand);
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
read++;
std::string ts_ret;
Expand Down Expand Up @@ -7066,10 +7092,10 @@ class Benchmark {
options.timestamp = &ts;
}

std::vector<Iterator*> tailing_iters;
std::vector<std::unique_ptr<Iterator>> tailing_iters;
if (FLAGS_use_tailing_iterator) {
for (const auto& db_with_cfh : dbs_to_use_) {
tailing_iters.push_back(db_with_cfh.db->NewIterator(options));
tailing_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}
options.auto_prefix_mode = FLAGS_auto_prefix_mode;
Expand Down Expand Up @@ -7115,12 +7141,12 @@ class Benchmark {
}

// Pick a Iterator to use
uint64_t db_idx_to_use =
static_cast<uint64_t>(key_rand) % dbs_to_use_.size();
std::unique_ptr<Iterator> single_iter;
Iterator* iter_to_use;
if (FLAGS_use_tailing_iterator) {
iter_to_use = tailing_iters[db_idx_to_use];
uint64_t db_idx_to_use =
static_cast<uint64_t>(key_rand) % dbs_to_use_.size();
iter_to_use = tailing_iters[db_idx_to_use].get();
} else {
single_iter.reset(db_with_cfh->db->NewIterator(
options, db_with_cfh->GetCfh(key_rand)));
Expand Down Expand Up @@ -7158,10 +7184,7 @@ class Benchmark {
thread->stats.ResetLastOpTime();
}

thread->stats.FinishedOps(&FirstDb(), FirstDb().db, 1, kSeek);
}
for (auto iter : tailing_iters) {
delete iter;
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
}

char msg[100];
Expand Down Expand Up @@ -7255,6 +7278,7 @@ class Benchmark {
// Special thread that keeps writing until other threads are done.
RandomGenerator gen;
int64_t bytes = 0;
int64_t key_rand = 0;

std::unique_ptr<RateLimiter> write_rate_limiter;
if (FLAGS_benchmark_write_rate_limit > 0) {
Expand Down Expand Up @@ -7288,7 +7312,9 @@ class Benchmark {
bool hint_printed = false;

while (true) {
DB* db = SelectDB(thread);
key_rand = GetRandomKey(&thread->rand);
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(key_rand);
DB* db = db_with_cfh->db;
{
MutexLock l(&thread->shared->mu);
if (FLAGS_finish_after_writes && written == writes_) {
Expand All @@ -7311,22 +7337,23 @@ class Benchmark {
}
}

GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
Status s;

Slice val = gen.Generate();
Slice ts;
if (user_timestamp_size_ > 0) {
ts = mock_app_clock_->Allocate(ts_guard.get());
}
ColumnFamilyHandle* cfh = db_with_cfh->GetCfh(key_rand);
if (write_merge == kWrite) {
if (user_timestamp_size_ == 0) {
s = db->Put(write_options_, key, val);
s = db->Put(write_options_, cfh, key, val);
} else {
s = db->Put(write_options_, key, ts, val);
s = db->Put(write_options_, cfh, key, ts, val);
}
} else {
s = db->Merge(write_options_, key, val);
s = db->Merge(write_options_, cfh, key, val);
}
// Restore write_options_
written++;
Expand All @@ -7335,7 +7362,7 @@ class Benchmark {
ErrorExit("put or merge error: %s", s.ToString().c_str());
}
bytes += key.size() + val.size() + user_timestamp_size_;
thread->stats.FinishedOps(&FirstDb(), FirstDb().db, 1, kWrite);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);

if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
Expand All @@ -7356,7 +7383,7 @@ class Benchmark {
writes_per_range_tombstone_ ==
0) {
num_range_deletions++;
int64_t begin_num = thread->rand.Next() % FLAGS_num;
int64_t begin_num = key_rand;
if (FLAGS_expand_range_tombstones) {
for (int64_t offset = 0; offset < range_tombstone_width_; ++offset) {
GenerateKeyFromInt(begin_num + offset, FLAGS_num,
Expand All @@ -7369,13 +7396,11 @@ class Benchmark {
GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
&end_key);
if (!db->DeleteRange(write_options_, db->DefaultColumnFamily(),
begin_key, end_key)
.ok()) {
if (!db->DeleteRange(write_options_, cfh, begin_key, end_key).ok()) {
ErrorExit("deleterange error: %s\n", s.ToString().c_str());
}
}
thread->stats.FinishedOps(&FirstDb(), FirstDb().db, 1, kWrite);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
// TODO: DeleteRange is not included in calculcation of bytes/rate
// limiter request
}
Expand Down Expand Up @@ -7882,25 +7907,36 @@ class Benchmark {
}
get_weight--;
reads_done++;
thread->stats.FinishedOps(db_with_cfh, db, 1, kRead);

if (reads_done % 256 == 255) {
LimitReadOrWriteRate(RateLimiter::OpType::kRead, thread, 256);
}

thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Slice put_val = gen.Generate();
size_t size_to_request =
put_val.size() + key.size() + user_timestamp_size_;
LimitReadOrWriteRate(RateLimiter::OpType::kWrite, thread,
size_to_request);

Status s;
if (user_timestamp_size_ > 0) {
Slice ts = mock_app_clock_->Allocate(ts_guard.get());
s = db->Put(write_options_, db_with_cfh->GetCfh(key_rand), key, ts,
gen.Generate());
put_val);
} else {
s = db->Put(write_options_, db_with_cfh->GetCfh(key_rand), key,
gen.Generate());
put_val);
}
if (!s.ok()) {
ErrorExit("put error: %s", s.ToString().c_str());
}
put_weight--;
writes_done++;
thread->stats.FinishedOps(db_with_cfh, db, 1, kWrite);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
}
}
char msg[100];
Expand All @@ -7922,19 +7958,17 @@ class Benchmark {
int64_t seeks = 0;
int64_t found = 0;
int64_t bytes = 0;
int64_t key_rand = 0;
ReadOptions options(FLAGS_verify_checksum, true);
options.total_order_seek = FLAGS_total_order_seek;
options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator;
options.readahead_size = FLAGS_readahead_size;

std::unique_ptr<Iterator> single_iter;
std::vector<std::unique_ptr<Iterator>> multi_iters;
if (IsSingleDb()) {
single_iter.reset(SingleDb().db->NewIterator(options));
} else {
std::vector<std::unique_ptr<Iterator>> tailing_iters;
if (FLAGS_use_tailing_iterator) {
for (const auto& db_with_cfh : dbs_to_use_) {
multi_iters.emplace_back(db_with_cfh.db->NewIterator(options));
tailing_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}

Expand All @@ -7959,6 +7993,9 @@ class Benchmark {
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
int prob_op = static_cast<int>(thread->rand.Uniform(100));
key_rand = GetRandomKey(&thread->rand);
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(key_rand);
DB* db = db_with_cfh->db;

// Seek
if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readwritepercent)) {
Expand All @@ -7969,7 +8006,7 @@ class Benchmark {
options.timestamp = &ts;
}

int64_t seek_pos = thread->rand.Next() % FLAGS_num;
int64_t seek_pos = key_rand;
GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
&key);
if (FLAGS_max_scan_distance != 0) {
Expand All @@ -7988,68 +8025,67 @@ class Benchmark {
}
}

if (!FLAGS_use_tailing_iterator) {
if (IsSingleDb()) {
single_iter.reset(SingleDb().db->NewIterator(options));
} else {
multi_iters.clear();
for (const auto& db_with_cfh : dbs_to_use_) {
multi_iters.emplace_back(db_with_cfh.db->NewIterator(options));
}
}
}

// Pick an Iterator to use
Iterator* iter_to_use = single_iter.get();
if (iter_to_use == nullptr) {
iter_to_use =
multi_iters[thread->rand.Next() % multi_iters.size()].get();
Iterator* iter_to_use;
std::unique_ptr<Iterator> single_iter;
if (FLAGS_use_tailing_iterator) {
uint64_t db_idx_to_use =
static_cast<uint64_t>(key_rand) % dbs_to_use_.size();
iter_to_use = tailing_iters[db_idx_to_use].get();
} else {
single_iter.reset(
db->NewIterator(options, db_with_cfh->GetCfh(key_rand)));
iter_to_use = single_iter.get();
}

iter_to_use->Seek(key);
seeks++;
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
if (iter_to_use->Valid()) {
bytes += iter_to_use->key().size() + iter_to_use->value().size();
if (iter_to_use->key().compare(key) == 0) {
found++;
}
}

for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
// Copy out iterator's value to make sure we read them.
bytes += iter_to_use->key().size() + iter_to_use->value().size();

if (!FLAGS_reverse_iterator) {
iter_to_use->Next();
} else {
iter_to_use->Prev();
}
assert(iter_to_use->status().ok());
bytes += iter_to_use->key().size() + iter_to_use->value().size();
}

if (thread->shared->read_rate_limiter.get() != nullptr &&
seeks % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
if (seeks % 256 == 255) {
LimitReadOrWriteRate(RateLimiter::OpType::kRead, thread, 256);
}

thread->stats.FinishedOps(&FirstDb(), FirstDb().db, 1, kSeek);
// Write
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
} else {
DB* db = SelectDB(thread);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
// Write Operation
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
Slice value = gen.Generate();
size_t size_to_request =
value.size() + key.size() + user_timestamp_size_;

LimitReadOrWriteRate(RateLimiter::OpType::kWrite, thread,
size_to_request);

Status s;
if (user_timestamp_size_ > 0) {
Slice ts = mock_app_clock_->Allocate(ts_guard.get());
s = db->Put(write_options_, key, ts, gen.Generate());
s = db->Put(write_options_, db_with_cfh->GetCfh(key_rand), key, ts,
value);
} else {
s = db->Put(write_options_, key, gen.Generate());
s = db->Put(write_options_, db_with_cfh->GetCfh(key_rand), key,
value);
}

bytes += size_to_request;
if (!s.ok()) {
ErrorExit("put error: %s", s.ToString().c_str());
}
writes_done++;
thread->stats.FinishedOps(nullptr, db, 1, kWrite);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
}
}

Expand Down

0 comments on commit 5cf8eec

Please sign in to comment.