forked from scylladb/seastar
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests: Add perf test for shard_token_bucket
The test checks if the token-bucket "rate" is held under various circumstances: - when shards sleep between grabbing tokens - when shards poll the t.b. frequently - when shards are disturbed with CPU hogs So far the test shows four problems: - With few shards tokens deficiency produces zero sleep time, so the "good" user that sleeps between grabs effectively converts into a polling ("bad") user (fixed by scylladb#1722) - Sometimes replenishing rounding errors accumulate and render lower resulting rate than configured (fixed by scylladb#1723) - When run with CPU hogs the individual shard's rates may differ too much (see scylladb#1083). E.g. the bucket configured with the rate of 100k tokens/sec, 48 shards, run 4 seconds. "Slowest" shard vs "fastest" shards get this amount of tokens: no hog: 6931 ... 9631 with hog: 2135 ... 29412 (sum rate is 100k with the aforementioned fixes) - With "capped-release" token bucket and token releasing by-timer with the configured rate and hogs the resulting throughput can be as low as 30% of the configured (see scylladb#1641) Created token-bucket 1000000.0 t/s perf_pure_context.sleeping_throughput_with_hog: 966646.1 t/s perf_capped_context.sleeping_throughput: 838035.2 t/s perf_pure_context.sleeping_throughput_with_hog: 317685.3 t/s Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
- Loading branch information
Showing
3 changed files
with
290 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,286 @@ | ||
/* | ||
* This file is open source software, licensed to you under the terms | ||
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file | ||
* distributed with this work for additional information regarding copyright | ||
* ownership. You may not use this file except in compliance with the License. | ||
* | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
/* | ||
* Copyright (C) 2020 ScyllaDB Ltd. | ||
*/ | ||
|
||
|
||
#include <random> | ||
#include <seastar/testing/perf_tests.hh> | ||
#include <seastar/testing/random.hh> | ||
#include <seastar/core/sharded.hh> | ||
#include <seastar/core/sleep.hh> | ||
#include <seastar/util/later.hh> | ||
#include <seastar/util/shared_token_bucket.hh> | ||
|
||
using capped_token_bucket_t = internal::shared_token_bucket<uint64_t, std::ratio<1>, internal::capped_release::yes>; | ||
using pure_token_bucket_t = internal::shared_token_bucket<uint64_t, std::ratio<1>, internal::capped_release::no>; | ||
using clock_type = std::chrono::steady_clock; | ||
|
||
struct work_result { | ||
uint64_t tokens; | ||
}; | ||
|
||
struct statistics { | ||
uint64_t total = 0; | ||
uint64_t min = std::numeric_limits<uint64_t>::max(); | ||
uint64_t max = std::numeric_limits<uint64_t>::min(); | ||
}; | ||
|
||
statistics accumulate(statistics acc, const work_result& val) { | ||
return statistics { | ||
.total = acc.total + val.tokens, | ||
.min = std::min(acc.min, val.tokens), | ||
.max = std::max(acc.max, val.tokens), | ||
}; | ||
} | ||
|
||
template <typename TokenBucket> | ||
struct worker : public seastar::peering_sharded_service<worker<TokenBucket>> { | ||
TokenBucket& tb; | ||
uint64_t tokens = 0; | ||
uint64_t tokens_per_tick_min = std::numeric_limits<uint64_t>::max(); | ||
uint64_t tokens_per_tick_max = std::numeric_limits<uint64_t>::min(); | ||
struct tick_data { | ||
std::chrono::microseconds delay; | ||
std::chrono::microseconds sleep; | ||
uint64_t defic; | ||
uint64_t tokens; | ||
uint64_t total; | ||
|
||
template <typename D1, typename D2> | ||
tick_data(D1 dur, D2 slp, uint64_t def, uint64_t lt, uint64_t tt) noexcept | ||
: delay(std::chrono::duration_cast<std::chrono::microseconds>(dur)) | ||
, sleep(std::chrono::duration_cast<std::chrono::microseconds>(slp)) | ||
, defic(def) | ||
, tokens(lt) | ||
, total(tt) | ||
{} | ||
}; | ||
std::deque<tick_data> ticks; | ||
const uint64_t threshold; // this is to mimic the main user of shared-token-bucket -- a fair_queue::dispatch_requests | ||
std::optional<std::pair<int, uint64_t>> head; | ||
std::uniform_int_distribution<int> size; | ||
uint64_t available = 0; | ||
uint64_t release_per_tick = 0; | ||
timer<> release_tokens; | ||
|
||
worker(TokenBucket& tb_) noexcept | ||
: tb(tb_) | ||
, threshold(tb.limit() / smp::count) | ||
, size(1, 128) | ||
, release_tokens([this] { do_release(); }) | ||
{ | ||
if constexpr (TokenBucket::is_capped == internal::capped_release::yes) { | ||
release_tokens.arm_periodic(std::chrono::microseconds(500)); | ||
release_per_tick = double(tb.rate()) / smp::count / 2000; | ||
} | ||
fmt::print("{} worker, threshold {}, release-per-tick {}\n", this_shard_id(), threshold, release_per_tick); | ||
} | ||
|
||
void do_release() { | ||
auto release = std::min(release_per_tick, available); | ||
available -= release; | ||
tb.release(release); | ||
} | ||
|
||
future<work_result> work(std::function<future<>(std::chrono::duration<double> d)> do_sleep) { | ||
assert(tokens == 0); | ||
auto start = clock_type::now(); | ||
return do_until([end = start + std::chrono::seconds(1)] { return clock_type::now() >= end; }, | ||
[this, start, do_sleep = std::move(do_sleep)] { | ||
uint64_t d = 0; | ||
uint64_t l_tokens = 0; | ||
int sz; | ||
|
||
while (l_tokens < threshold) { | ||
if (head) { | ||
tb.replenish(clock_type::now()); | ||
d = tb.deficiency(head->second); | ||
if (d > 0) { | ||
break; | ||
} | ||
sz = head->first; | ||
head.reset(); | ||
} else { | ||
sz = size(testing::local_random_engine); | ||
auto h = tb.grab(sz); | ||
d = tb.deficiency(h); | ||
if (d > 0) { | ||
head = std::make_pair(sz, h); | ||
break; | ||
} | ||
} | ||
tokens += sz; | ||
l_tokens += sz; | ||
available += sz; | ||
} | ||
|
||
tokens_per_tick_min = std::min(tokens_per_tick_min, l_tokens); | ||
tokens_per_tick_max = std::max(tokens_per_tick_max, l_tokens); | ||
auto p = tb.duration_for(d); | ||
ticks.emplace_back(clock_type::now() - start, p, d, l_tokens, tokens); | ||
if (ticks.size() > 2048) { | ||
ticks.pop_front(); | ||
} | ||
return do_sleep(p); | ||
} | ||
).then([this, start] { | ||
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count(); | ||
fmt::print("{} {}t/{:.3f}s, speed is {:.1f}t/s goal {:.1f}t/s, {} ticks, per-tick [{}..{}]\n", this_shard_id(), tokens, delay, | ||
double(tokens) / delay, double(tb.rate()) / smp::count, | ||
ticks.size(), tokens_per_tick_min, tokens_per_tick_max); | ||
tokens_per_tick_min = std::numeric_limits<uint64_t>::max(); | ||
tokens_per_tick_max = std::numeric_limits<uint64_t>::min(); | ||
work_result r { | ||
.tokens = std::exchange(this->tokens, 0), | ||
}; | ||
return make_ready_future<work_result>(std::move(r)); | ||
}); | ||
} | ||
|
||
future<work_result> work_sleeping() { | ||
return work([this] (std::chrono::duration<double> d) { | ||
return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(d)); | ||
}); | ||
} | ||
|
||
future<work_result> work_yielding() { | ||
return work([] (std::chrono::duration<double>) { | ||
return seastar::yield(); | ||
}); | ||
} | ||
|
||
future<> print_and_clear_ticks() { | ||
fmt::print("{} {} ticks\n", this_shard_id(), ticks.size()); | ||
std::chrono::microseconds p(0); | ||
for (auto& td : ticks) { | ||
fmt::print(" {:8} +{:5} us {:5}/{:5} def {:3} sleep {:5} us\n", td.delay.count(), (td.delay - p).count(), td.tokens, td.total, td.defic, td.sleep.count()); | ||
p = td.delay; | ||
} | ||
ticks.clear(); | ||
if (this_shard_id() == smp::count - 1) { | ||
return make_ready_future<>(); | ||
} | ||
|
||
return this->container().invoke_on(this_shard_id() + 1, &worker::print_and_clear_ticks); | ||
} | ||
}; | ||
|
||
struct hog { | ||
std::exponential_distribution<double> busy; | ||
std::exponential_distribution<double> rest; | ||
std::optional<future<>> stopped; | ||
bool keep_going = false; | ||
uint64_t _iterations = 0; | ||
|
||
template <typename T1, typename T2> | ||
hog(T1 b, T2 r) noexcept | ||
: busy(1.0 / std::chrono::duration_cast<std::chrono::duration<double>>(b).count()) | ||
, rest(1.0 / std::chrono::duration_cast<std::chrono::duration<double>>(r).count()) | ||
{} | ||
|
||
void work() { | ||
assert(!stopped.has_value()); | ||
keep_going = true; | ||
stopped = do_until([this] { return !keep_going; }, | ||
[this] { | ||
auto p = std::chrono::duration<double>(rest(testing::local_random_engine)); | ||
return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(p)).then([this] { | ||
_iterations++; | ||
auto until = clock_type::now() + std::chrono::duration<double>(busy(testing::local_random_engine)); | ||
do { | ||
} while (clock_type::now() < until && keep_going); | ||
}); | ||
} | ||
); | ||
} | ||
|
||
future<> terminate() { | ||
assert(stopped.has_value()); | ||
keep_going = false; | ||
auto f = std::move(*stopped); | ||
stopped.reset(); | ||
return f.finally([this] { | ||
fmt::print("{}-hog {} iters\n", this_shard_id(), std::exchange(_iterations, 0)); | ||
}); | ||
} | ||
}; | ||
|
||
template <typename TokenBucket> | ||
struct context { | ||
using worker_t = worker<TokenBucket>; | ||
TokenBucket tb; | ||
seastar::sharded<worker_t> w; | ||
seastar::sharded<hog> h; | ||
|
||
static constexpr uint64_t rate = 1000000; | ||
static constexpr uint64_t limit = rate / 2000; | ||
static constexpr uint64_t threshold = 1; | ||
|
||
context() : tb(rate, limit, threshold) | ||
{ | ||
w.start(std::ref(tb)).get(); | ||
h.start(std::chrono::microseconds(500), std::chrono::microseconds(10)).get(); | ||
fmt::print("Created tb {}t/s (limit {} threshold {})\n", tb.rate(), tb.limit(), tb.threshold()); | ||
} | ||
|
||
~context() { | ||
h.stop().get(); | ||
w.stop().get(); | ||
} | ||
|
||
template <typename Fn> | ||
future<> run_workers(Fn&& fn) { | ||
auto start = clock_type::now(); | ||
return w.map_reduce0(std::forward<Fn>(fn), statistics{}, accumulate).then([this, start] (statistics st) { | ||
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count(); | ||
fmt::print("effective rate is {:.1f}t/s, [{} ... {}]\n", st.total / delay, st.min, st.max); | ||
}); | ||
} | ||
|
||
future<> test_sleeping() { | ||
fmt::print("---8<---\n"); | ||
return run_workers(&worker_t::work_sleeping); | ||
} | ||
|
||
future<> test_yielding() { | ||
fmt::print("---8<---\n"); | ||
return run_workers(&worker_t::work_yielding); | ||
} | ||
|
||
future<> test_sleeping_with_hog() { | ||
fmt::print("---8<---\n"); | ||
return h.invoke_on_all(&hog::work).then([this] { | ||
return run_workers(&worker_t::work_sleeping).then([this] { | ||
return h.invoke_on_all(&hog::terminate); | ||
}); | ||
}); | ||
} | ||
}; | ||
|
||
struct perf_capped_context : public context<capped_token_bucket_t> {}; | ||
struct perf_pure_context : public context<pure_token_bucket_t> {}; | ||
|
||
PERF_TEST_F(perf_capped_context, sleeping_throughput) { return test_sleeping(); } | ||
PERF_TEST_F(perf_capped_context, yielding_throughput) { return test_yielding(); } | ||
PERF_TEST_F(perf_capped_context, sleeping_throughput_with_hog) { return test_sleeping_with_hog(); } | ||
|
||
PERF_TEST_F(perf_pure_context, sleeping_throughput) { return test_sleeping(); } | ||
PERF_TEST_F(perf_pure_context, yielding_throughput) { return test_yielding(); } | ||
PERF_TEST_F(perf_pure_context, sleeping_throughput_with_hog) { return test_sleeping_with_hog(); } |