diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 053dc5a816b..508699789a3 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace CurrentMetrics { @@ -107,6 +108,7 @@ WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64 , requests_to_wait{0} , type(type_) , alloc_bytes{0} + , log(Logger::get(std::string(magic_enum::enum_name(type)))) {} WriteLimiter::~WriteLimiter() @@ -131,7 +133,8 @@ void WriteLimiter::request(Int64 bytes) consumeBytes(bytes); return; } - + Stopwatch sw_pending; + Int64 wait_times = 0; auto pending_request = pendingRequestMetrics(type); // request cannot be satisfied at this moment, enqueue @@ -140,7 +143,7 @@ void WriteLimiter::request(Int64 bytes) while (!r.granted) { assert(!req_queue.empty()); - + wait_times++; bool timed_out = false; // if this request is in the front of req_queue, // then it is responsible to trigger the refill process. @@ -191,6 +194,7 @@ void WriteLimiter::request(Int64 bytes) } } } + LOG_FMT_TRACE(log, "pending_us {} wait_times {} pending_count {} rate_limit_per_sec {}", sw_pending.elapsed() / 1000, wait_times, req_queue.size(), refill_balance_per_period * 1000 / refill_period_ms); } size_t WriteLimiter::setStop() @@ -296,7 +300,7 @@ ReadLimiter::ReadLimiter( : WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_) , get_read_bytes(std::move(get_read_bytes_)) , last_stat_bytes(get_read_bytes()) - , log(Logger::get("ReadLimiter")) + , last_refill_time(std::chrono::system_clock::now()) {} Int64 ReadLimiter::getAvailableBalance() @@ -317,17 +321,21 @@ Int64 ReadLimiter::getAvailableBalance() else { Int64 real_alloc_bytes = bytes - last_stat_bytes; - metricAllocBytes(type, real_alloc_bytes); + // `alloc_bytes` is the number of byte that ReadLimiter has allocated. + if (available_balance > 0) + { + auto can_alloc_bytes = std::min(real_alloc_bytes, available_balance); + alloc_bytes += can_alloc_bytes; + metricAllocBytes(type, can_alloc_bytes); + } available_balance -= real_alloc_bytes; - alloc_bytes += real_alloc_bytes; } last_stat_bytes = bytes; return available_balance; } -void ReadLimiter::consumeBytes(Int64 bytes) +void ReadLimiter::consumeBytes([[maybe_unused]] Int64 bytes) { - metricRequestBytes(type, bytes); // Do nothing for read. } @@ -338,10 +346,26 @@ bool ReadLimiter::canGrant([[maybe_unused]] Int64 bytes) void ReadLimiter::refillAndAlloc() { - if (available_balance < refill_balance_per_period) + // `available_balance` of `ReadLimiter` may be overdrawn. + if (available_balance < 0) + { + // Limiter may not be called for a long time. + // During this time, limiter can be refilled at most `max_refill_times` times and covers some overdraft. + auto elapsed_duration = std::chrono::system_clock::now() - last_refill_time; + UInt64 elapsed_ms = std::chrono::duration_cast(elapsed_duration).count(); + // At least refill one time. + Int64 max_refill_times = std::max(elapsed_ms, refill_period_ms) / refill_period_ms; + Int64 max_refill_bytes = max_refill_times * refill_balance_per_period; + Int64 can_alloc_bytes = std::min(-available_balance, max_refill_bytes); + alloc_bytes += can_alloc_bytes; + metricAllocBytes(type, can_alloc_bytes); + available_balance = std::min(available_balance + max_refill_bytes, refill_balance_per_period); + } + else { - available_balance += refill_balance_per_period; + available_balance = refill_balance_per_period; } + last_refill_time = std::chrono::system_clock::now(); assert(!req_queue.empty()); auto * head_req = req_queue.front(); @@ -710,26 +734,13 @@ IOLimitTuner::TuneResult IOLimitTuner::tune() const } auto [max_read_bytes_per_sec, max_write_bytes_per_sec, rw_tuned] = tuneReadWrite(); - LOG_FMT_INFO( - log, - "tuneReadWrite: max_read {} max_write {} rw_tuned {}", - max_read_bytes_per_sec, - max_write_bytes_per_sec, - rw_tuned); auto [max_bg_read_bytes_per_sec, max_fg_read_bytes_per_sec, read_tuned] = tuneRead(max_read_bytes_per_sec); - LOG_FMT_INFO( - log, - "tuneRead: bg_read {} fg_read {} read_tuned {}", - max_bg_read_bytes_per_sec, - max_fg_read_bytes_per_sec, - read_tuned); auto [max_bg_write_bytes_per_sec, max_fg_write_bytes_per_sec, write_tuned] = tuneWrite(max_write_bytes_per_sec); - LOG_FMT_INFO( - log, - "tuneWrite: bg_write {} fg_write {} write_tuned {}", - max_bg_write_bytes_per_sec, - max_fg_write_bytes_per_sec, - write_tuned); + if (rw_tuned || read_tuned || write_tuned) + { + LOG_FMT_INFO(log, "tune_msg: bg_write {} => {} fg_write {} => {} bg_read {} => {} fg_read {} => {}", bg_write_stat != nullptr ? bg_write_stat->maxBytesPerSec() : 0, max_bg_write_bytes_per_sec, fg_write_stat != nullptr ? fg_write_stat->maxBytesPerSec() : 0, max_fg_write_bytes_per_sec, bg_read_stat != nullptr ? bg_read_stat->maxBytesPerSec() : 0, max_bg_read_bytes_per_sec, fg_read_stat != nullptr ? fg_read_stat->maxBytesPerSec() : 0, max_fg_read_bytes_per_sec); + } + return {.max_bg_read_bytes_per_sec = max_bg_read_bytes_per_sec, .max_fg_read_bytes_per_sec = max_fg_read_bytes_per_sec, .read_tuned = read_tuned || rw_tuned, diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index 4e7846b9a64..b0578433d58 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -93,7 +93,10 @@ class WriteLimiter void request(Int64 bytes); // just for test purpose - inline UInt64 getTotalBytesThrough() const { return alloc_bytes; } + inline UInt64 getTotalBytesThrough() const + { + return available_balance < 0 ? alloc_bytes - available_balance : alloc_bytes; + } LimiterStat getStat(); @@ -153,6 +156,7 @@ class WriteLimiter Stopwatch stat_stop_watch; UInt64 alloc_bytes; + LoggerPtr log; }; using WriteLimiterPtr = std::shared_ptr; @@ -194,7 +198,7 @@ class ReadLimiter : public WriteLimiter std::function get_read_bytes; Int64 last_stat_bytes; - LoggerPtr log; + std::chrono::time_point last_refill_time; }; using ReadLimiterPtr = std::shared_ptr; diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 854473bae44..70dc6b65198 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -22,8 +22,6 @@ #include #include -#include "common/types.h" - #ifdef __linux__ #include #endif @@ -333,23 +331,47 @@ TEST(ReadLimiterTest, LimiterStat) ASSERT_GT(stat.pct(), 100) << stat.toString(); } - static constexpr UInt64 alloc_bytes = 2047; + static constexpr UInt64 total_bytes = 2047; for (int i = 0; i < 11; i++) { request(read_limiter, 1 << i); } - std::this_thread::sleep_for(100ms); ASSERT_EQ(read_limiter.getAvailableBalance(), -947); stat = read_limiter.getStat(); - ASSERT_EQ(stat.alloc_bytes, alloc_bytes); - ASSERT_GE(stat.elapsed_ms, alloc_bytes / 100 + 1); + ASSERT_EQ(stat.alloc_bytes, total_bytes + read_limiter.getAvailableBalance()); + ASSERT_GE(stat.elapsed_ms, stat.alloc_bytes / 100 + 1); ASSERT_EQ(stat.refill_period_ms, 100ul); ASSERT_EQ(stat.refill_bytes_per_period, 100); ASSERT_EQ(stat.maxBytesPerSec(), 1000); - ASSERT_EQ(stat.avgBytesPerSec(), static_cast(alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString(); - ASSERT_EQ(stat.pct(), static_cast(alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString(); + ASSERT_EQ(stat.avgBytesPerSec(), static_cast(stat.alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString(); + ASSERT_EQ(stat.pct(), static_cast(stat.alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString(); +} + +TEST(ReadLimiterTest, ReadMany) +{ + Int64 real_read_bytes{0}; + auto get_read_bytes = [&]() { + return real_read_bytes; + }; + auto request = [&](ReadLimiter & limiter, Int64 bytes) { + limiter.request(bytes); + real_read_bytes += bytes; + }; + + constexpr Int64 bytes_per_sec = 1000; + constexpr UInt64 refill_period_ms = 100; + ReadLimiter read_limiter(get_read_bytes, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms); + ASSERT_EQ(read_limiter.getAvailableBalance(), 100); + request(read_limiter, 1000); + ASSERT_EQ(read_limiter.getAvailableBalance(), -900); + ASSERT_EQ(read_limiter.alloc_bytes, 100); + + std::this_thread::sleep_for(1200ms); + Stopwatch sw; + request(read_limiter, 100); + ASSERT_LE(sw.elapsedMilliseconds(), 1); // Not blocked. } #ifdef __linux__ diff --git a/dbms/src/TestUtils/MockReadLimiter.h b/dbms/src/TestUtils/MockReadLimiter.h index 0bb69145a67..64967d3c7c1 100644 --- a/dbms/src/TestUtils/MockReadLimiter.h +++ b/dbms/src/TestUtils/MockReadLimiter.h @@ -31,8 +31,8 @@ class MockReadLimiter final : public ReadLimiter protected: void consumeBytes(Int64 bytes) override { - // Need soft limit here. - WriteLimiter::consumeBytes(bytes); // NOLINT(bugprone-parent-virtual-call) + alloc_bytes += std::min(available_balance, bytes); + available_balance -= bytes; } };