forked from scylladb/seastar
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests: Add perf test for shard_token_bucket
The test checks if the token-bucket "rate" is held under various circumstances: - when shards sleep between grabbing tokens - when shards poll the t.b. frequently - when shards are disturbed with CPU hogs So far the test shows three problems: - With few shards tokens deficiency produces zero sleep time, so the "good" user that sleeps between grabs effectively converts into a polling ("bad") user (fixed by scylladb#1722) - Sometimes replenishing rounding errors accumulate and render lower resulting rate than configured (fixed by scylladb#1723) - When run with CPU hogs the individual shard's rates may differ too much (see scylladb#1083). E.g. the bucket configured with the rate of 100k tokens/sec, 48 shards, run 4 seconds. "Slowest" shard vs "fastest" shards get this amount of tokens: no hog: 6931 ... 9631 with hog: 2135 ... 29412 (sum rate is 100k with the aforementioned fixes) Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
- Loading branch information
Showing
2 changed files
with
199 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
/* | ||
* This file is open source software, licensed to you under the terms | ||
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file | ||
* distributed with this work for additional information regarding copyright | ||
* ownership. You may not use this file except in compliance with the License. | ||
* | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
/* | ||
* Copyright (C) 2020 ScyllaDB Ltd. | ||
*/ | ||
|
||
|
||
#include <seastar/testing/perf_tests.hh> | ||
#include <seastar/core/sharded.hh> | ||
#include <seastar/core/sleep.hh> | ||
#include <seastar/util/later.hh> | ||
#include <seastar/util/shared_token_bucket.hh> | ||
|
||
using token_bucket_t = internal::shared_token_bucket<uint64_t, std::ratio<1>, internal::capped_release::no>; | ||
using clock_type = std::chrono::steady_clock; | ||
|
||
struct worker { | ||
token_bucket_t& tb; | ||
uint64_t tokens = 0; | ||
const uint64_t threshold; // this is to mimic the main user of shared-token-bucket -- a fair_queue::dispatch_requests | ||
std::optional<uint64_t> head; | ||
|
||
worker(token_bucket_t& tb_) noexcept | ||
: tb(tb_) | ||
, threshold(tb.limit() / smp::count) | ||
{} | ||
|
||
future<uint64_t> work(std::function<future<>(uint64_t d)> do_sleep) { | ||
assert(tokens == 0); | ||
auto start = clock_type::now(); | ||
return do_until([end = start + std::chrono::seconds(4)] { return clock_type::now() >= end; }, | ||
[this, start, do_sleep = std::move(do_sleep)] { | ||
uint64_t d = 0; | ||
uint64_t l_tokens = 0; | ||
|
||
while (l_tokens < threshold) { | ||
if (head) { | ||
tb.replenish(clock_type::now()); | ||
d = tb.deficiency(*head); | ||
if (d > 0) { | ||
break; | ||
} | ||
head.reset(); | ||
} else { | ||
auto h = tb.grab(1); | ||
auto d = tb.deficiency(h); | ||
if (d > 0) { | ||
head = h; | ||
break; | ||
} | ||
} | ||
tokens++; | ||
l_tokens++; | ||
} | ||
|
||
return do_sleep(d); | ||
} | ||
).then([this, start] { | ||
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count(); | ||
fmt::print("{} {}t/{:.3f}s, speed is {:.1f}t/s\n", this_shard_id(), tokens, delay, double(tokens) / delay); | ||
return make_ready_future<uint64_t>(std::exchange(tokens, 0)); | ||
}); | ||
} | ||
|
||
future<uint64_t> work_sleeping() { | ||
return work([this] (uint64_t d) { | ||
return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(tb.duration_for(d))); | ||
}); | ||
} | ||
|
||
future<uint64_t> work_yielding() { | ||
return work([] (uint64_t) { | ||
return seastar::yield(); | ||
}); | ||
} | ||
}; | ||
|
||
struct hog { | ||
std::chrono::duration<double> busy; | ||
std::chrono::duration<double> rest; | ||
std::optional<future<>> stopped; | ||
bool keep_going = false; | ||
|
||
template <typename T1, typename T2> | ||
hog(T1 b, T2 r) noexcept | ||
: busy(std::chrono::duration_cast<decltype(busy)>(b)) | ||
, rest(std::chrono::duration_cast<decltype(rest)>(r)) | ||
{} | ||
|
||
void work() { | ||
assert(!stopped.has_value()); | ||
keep_going = true; | ||
stopped = do_until([this] { return !keep_going; }, | ||
[this] { | ||
return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(rest)).then([this] { | ||
auto until = clock_type::now() + busy; | ||
do { | ||
} while (clock_type::now() < until && keep_going); | ||
}); | ||
} | ||
); | ||
} | ||
|
||
future<> terminate() { | ||
assert(stopped.has_value()); | ||
keep_going = false; | ||
auto f = std::move(*stopped); | ||
stopped.reset(); | ||
return f; | ||
} | ||
}; | ||
|
||
struct perf_shared_token_bucket { | ||
token_bucket_t tb; | ||
seastar::sharded<worker> w; | ||
seastar::sharded<hog> h; | ||
|
||
static constexpr uint64_t rate = 100000; | ||
static constexpr uint64_t limit = 110000; | ||
static constexpr uint64_t threshold = 100; | ||
|
||
perf_shared_token_bucket() : tb(rate, limit, threshold) | ||
{ | ||
w.start(std::ref(tb)).get(); | ||
h.start(std::chrono::milliseconds(2), std::chrono::microseconds(10)).get(); | ||
fmt::print("Created tb {}t/s\n", tb.rate()); | ||
} | ||
|
||
~perf_shared_token_bucket() { | ||
h.stop().get(); | ||
w.stop().get(); | ||
} | ||
}; | ||
|
||
struct statistics { | ||
uint64_t total = 0; | ||
uint64_t min = std::numeric_limits<uint64_t>::max(); | ||
uint64_t max = std::numeric_limits<uint64_t>::min(); | ||
}; | ||
|
||
statistics accumulate(statistics acc, uint64_t val) { | ||
return statistics { | ||
.total = acc.total + val, | ||
.min = std::min(acc.min, val), | ||
.max = std::max(acc.max, val), | ||
}; | ||
} | ||
|
||
PERF_TEST_F(perf_shared_token_bucket, sleeping_throughput) | ||
{ | ||
fmt::print("---8<---\n"); | ||
auto start = clock_type::now(); | ||
return w.map_reduce0(&worker::work_sleeping, statistics{}, accumulate).then([this, start] (statistics st) { | ||
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count(); | ||
fmt::print("effective rate is {:.1f}t/s, [{} ... {}]\n", st.total / delay, st.min, st.max); | ||
}); | ||
} | ||
|
||
PERF_TEST_F(perf_shared_token_bucket, yielding_throughput) | ||
{ | ||
fmt::print("---8<---\n"); | ||
auto start = clock_type::now(); | ||
return w.map_reduce0(&worker::work_yielding, statistics{}, accumulate).then([this, start] (statistics st) { | ||
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count(); | ||
fmt::print("effective rate is {:.1f}t/s, [{} ... {}]\n", st.total / delay, st.min, st.max); | ||
}); | ||
} | ||
|
||
PERF_TEST_F(perf_shared_token_bucket, sleeping_throughput_with_hog) | ||
{ | ||
fmt::print("---8<---\n"); | ||
return h.invoke_on_all(&hog::work).then([this] { | ||
auto start = clock_type::now(); | ||
return w.map_reduce0(&worker::work_sleeping, statistics{}, accumulate).then([this, start] (statistics st) { | ||
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count(); | ||
fmt::print("effective rate is {:.1f}t/s, [{} ... {}]\n", st.total / delay, st.min, st.max); | ||
}).then([this] { | ||
return h.invoke_on_all(&hog::terminate); | ||
}); | ||
}); | ||
} |