Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alloc_bytes of ReadLimiter #5852

Merged
merged 11 commits into from
Sep 16, 2022
65 changes: 38 additions & 27 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <boost/algorithm/string.hpp>
#include <cassert>
#include <fstream>
#include <magic_enum.hpp>

namespace CurrentMetrics
{
Expand Down Expand Up @@ -107,6 +108,7 @@ WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64
, requests_to_wait{0}
, type(type_)
, alloc_bytes{0}
, log(Logger::get(std::string(magic_enum::enum_name(type))))
{}

WriteLimiter::~WriteLimiter()
Expand All @@ -131,7 +133,8 @@ void WriteLimiter::request(Int64 bytes)
consumeBytes(bytes);
return;
}

Stopwatch sw_pending;
Int64 wait_times = 0;
auto pending_request = pendingRequestMetrics(type);

// request cannot be satisfied at this moment, enqueue
Expand All @@ -140,7 +143,7 @@ void WriteLimiter::request(Int64 bytes)
while (!r.granted)
{
assert(!req_queue.empty());

wait_times++;
bool timed_out = false;
// if this request is in the front of req_queue,
// then it is responsible to trigger the refill process.
Expand Down Expand Up @@ -191,6 +194,7 @@ void WriteLimiter::request(Int64 bytes)
}
}
}
LOG_FMT_TRACE(log, "pending_us {} wait_times {} pending_count {} rate_limit_per_sec {}", sw_pending.elapsed() / 1000, wait_times, req_queue.size(), refill_balance_per_period * 1000 / refill_period_ms);
}

size_t WriteLimiter::setStop()
Expand Down Expand Up @@ -296,7 +300,7 @@ ReadLimiter::ReadLimiter(
: WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_)
, get_read_bytes(std::move(get_read_bytes_))
, last_stat_bytes(get_read_bytes())
, log(Logger::get("ReadLimiter"))
, last_refill_time(std::chrono::system_clock::now())
{}

Int64 ReadLimiter::getAvailableBalance()
Expand All @@ -317,17 +321,21 @@ Int64 ReadLimiter::getAvailableBalance()
else
{
Int64 real_alloc_bytes = bytes - last_stat_bytes;
metricAllocBytes(type, real_alloc_bytes);
// `alloc_bytes` is the number of byte that ReadLimiter has allocated.
if (available_balance > 0)
{
auto can_alloc_bytes = std::min(real_alloc_bytes, available_balance);
alloc_bytes += can_alloc_bytes;
metricAllocBytes(type, can_alloc_bytes);
}
available_balance -= real_alloc_bytes;
alloc_bytes += real_alloc_bytes;
}
last_stat_bytes = bytes;
return available_balance;
}

void ReadLimiter::consumeBytes(Int64 bytes)
void ReadLimiter::consumeBytes([[maybe_unused]] Int64 bytes)
{
metricRequestBytes(type, bytes);
// Do nothing for read.
}

Expand All @@ -338,10 +346,26 @@ bool ReadLimiter::canGrant([[maybe_unused]] Int64 bytes)

void ReadLimiter::refillAndAlloc()
{
if (available_balance < refill_balance_per_period)
// `available_balance` of `ReadLimiter` may be overdrawn.
if (available_balance < 0)
{
// Limiter may not be called for a long time.
// During this time, limiter can be refilled at most `max_refill_times` times and covers some overdraft.
auto elapsed_duration = std::chrono::system_clock::now() - last_refill_time;
UInt64 elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_duration).count();
// At least refill one time.
Int64 max_refill_times = std::max(elapsed_ms, refill_period_ms) / refill_period_ms;
Int64 max_refill_bytes = max_refill_times * refill_balance_per_period;
Int64 can_alloc_bytes = std::min(-available_balance, max_refill_bytes);
alloc_bytes += can_alloc_bytes;
metricAllocBytes(type, can_alloc_bytes);
available_balance = std::min(available_balance + max_refill_bytes, refill_balance_per_period);
}
else
{
available_balance += refill_balance_per_period;
available_balance = refill_balance_per_period;
flowbehappy marked this conversation as resolved.
Show resolved Hide resolved
}
last_refill_time = std::chrono::system_clock::now();

assert(!req_queue.empty());
auto * head_req = req_queue.front();
Expand Down Expand Up @@ -710,26 +734,13 @@ IOLimitTuner::TuneResult IOLimitTuner::tune() const
}

auto [max_read_bytes_per_sec, max_write_bytes_per_sec, rw_tuned] = tuneReadWrite();
LOG_FMT_INFO(
log,
"tuneReadWrite: max_read {} max_write {} rw_tuned {}",
max_read_bytes_per_sec,
max_write_bytes_per_sec,
rw_tuned);
auto [max_bg_read_bytes_per_sec, max_fg_read_bytes_per_sec, read_tuned] = tuneRead(max_read_bytes_per_sec);
LOG_FMT_INFO(
log,
"tuneRead: bg_read {} fg_read {} read_tuned {}",
max_bg_read_bytes_per_sec,
max_fg_read_bytes_per_sec,
read_tuned);
auto [max_bg_write_bytes_per_sec, max_fg_write_bytes_per_sec, write_tuned] = tuneWrite(max_write_bytes_per_sec);
LOG_FMT_INFO(
log,
"tuneWrite: bg_write {} fg_write {} write_tuned {}",
max_bg_write_bytes_per_sec,
max_fg_write_bytes_per_sec,
write_tuned);
if (rw_tuned || read_tuned || write_tuned)
{
LOG_FMT_INFO(log, "tune_msg: bg_write {} => {} fg_write {} => {} bg_read {} => {} fg_read {} => {}", bg_write_stat != nullptr ? bg_write_stat->maxBytesPerSec() : 0, max_bg_write_bytes_per_sec, fg_write_stat != nullptr ? fg_write_stat->maxBytesPerSec() : 0, max_fg_write_bytes_per_sec, bg_read_stat != nullptr ? bg_read_stat->maxBytesPerSec() : 0, max_bg_read_bytes_per_sec, fg_read_stat != nullptr ? fg_read_stat->maxBytesPerSec() : 0, max_fg_read_bytes_per_sec);
}

return {.max_bg_read_bytes_per_sec = max_bg_read_bytes_per_sec,
.max_fg_read_bytes_per_sec = max_fg_read_bytes_per_sec,
.read_tuned = read_tuned || rw_tuned,
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Encryption/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ class WriteLimiter
void request(Int64 bytes);

// just for test purpose
inline UInt64 getTotalBytesThrough() const { return alloc_bytes; }
inline UInt64 getTotalBytesThrough() const
{
return available_balance < 0 ? alloc_bytes - available_balance : alloc_bytes;
}

LimiterStat getStat();

Expand Down Expand Up @@ -153,6 +156,7 @@ class WriteLimiter

Stopwatch stat_stop_watch;
UInt64 alloc_bytes;
LoggerPtr log;
};

using WriteLimiterPtr = std::shared_ptr<WriteLimiter>;
Expand Down Expand Up @@ -194,7 +198,7 @@ class ReadLimiter : public WriteLimiter

std::function<Int64()> get_read_bytes;
Int64 last_stat_bytes;
LoggerPtr log;
std::chrono::time_point<std::chrono::system_clock> last_refill_time;
};

using ReadLimiterPtr = std::shared_ptr<ReadLimiter>;
Expand Down
38 changes: 30 additions & 8 deletions dbms/src/Encryption/tests/gtest_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <random>
#include <thread>

#include "common/types.h"

#ifdef __linux__
#include <sys/syscall.h>
#endif
Expand Down Expand Up @@ -333,23 +331,47 @@ TEST(ReadLimiterTest, LimiterStat)
ASSERT_GT(stat.pct(), 100) << stat.toString();
}

static constexpr UInt64 alloc_bytes = 2047;
static constexpr UInt64 total_bytes = 2047;
for (int i = 0; i < 11; i++)
{
request(read_limiter, 1 << i);
}

std::this_thread::sleep_for(100ms);
ASSERT_EQ(read_limiter.getAvailableBalance(), -947);

stat = read_limiter.getStat();
ASSERT_EQ(stat.alloc_bytes, alloc_bytes);
ASSERT_GE(stat.elapsed_ms, alloc_bytes / 100 + 1);
ASSERT_EQ(stat.alloc_bytes, total_bytes + read_limiter.getAvailableBalance());
ASSERT_GE(stat.elapsed_ms, stat.alloc_bytes / 100 + 1);
ASSERT_EQ(stat.refill_period_ms, 100ul);
ASSERT_EQ(stat.refill_bytes_per_period, 100);
ASSERT_EQ(stat.maxBytesPerSec(), 1000);
ASSERT_EQ(stat.avgBytesPerSec(), static_cast<Int64>(alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString();
ASSERT_EQ(stat.pct(), static_cast<Int64>(alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString();
ASSERT_EQ(stat.avgBytesPerSec(), static_cast<Int64>(stat.alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString();
ASSERT_EQ(stat.pct(), static_cast<Int64>(stat.alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString();
}

TEST(ReadLimiterTest, ReadMany)
{
Int64 real_read_bytes{0};
auto get_read_bytes = [&]() {
return real_read_bytes;
};
auto request = [&](ReadLimiter & limiter, Int64 bytes) {
limiter.request(bytes);
real_read_bytes += bytes;
};

constexpr Int64 bytes_per_sec = 1000;
constexpr UInt64 refill_period_ms = 100;
ReadLimiter read_limiter(get_read_bytes, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms);
ASSERT_EQ(read_limiter.getAvailableBalance(), 100);
request(read_limiter, 1000);
ASSERT_EQ(read_limiter.getAvailableBalance(), -900);
ASSERT_EQ(read_limiter.alloc_bytes, 100);

std::this_thread::sleep_for(1200ms);
Stopwatch sw;
request(read_limiter, 100);
ASSERT_LE(sw.elapsedMilliseconds(), 1); // Not blocked.
}

#ifdef __linux__
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TestUtils/MockReadLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class MockReadLimiter final : public ReadLimiter
protected:
void consumeBytes(Int64 bytes) override
{
// Need soft limit here.
WriteLimiter::consumeBytes(bytes); // NOLINT(bugprone-parent-virtual-call)
alloc_bytes += std::min(available_balance, bytes);
available_balance -= bytes;
}
};

Expand Down