diff --git a/dbms/src/Encryption/RateLimiter.cpp b/dbms/src/Encryption/RateLimiter.cpp index 38fd8468341..f299b585150 100644 --- a/dbms/src/Encryption/RateLimiter.cpp +++ b/dbms/src/Encryption/RateLimiter.cpp @@ -99,6 +99,23 @@ inline CurrentMetrics::Increment pendingRequestMetrics(LimiterType type) } } +String getEnumName(LimiterType type) +{ + switch (type) + { + case LimiterType::FG_READ: + return "FG_READ"; + case LimiterType::BG_READ: + return "BG_READ"; + case LimiterType::FG_WRITE: + return "FG_WRITE"; + case LimiterType::BG_WRITE: + return "BG_WRITE"; + default: + return "UNKNOWN"; + } +} + WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64 refill_period_ms_) : refill_period_ms{refill_period_ms_} , refill_balance_per_period{calculateRefillBalancePerPeriod(rate_limit_per_sec_)} @@ -107,6 +124,7 @@ WriteLimiter::WriteLimiter(Int64 rate_limit_per_sec_, LimiterType type_, UInt64 , requests_to_wait{0} , type(type_) , alloc_bytes{0} + , log(Logger::get(getEnumName(type_))) {} WriteLimiter::~WriteLimiter() @@ -131,7 +149,8 @@ void WriteLimiter::request(Int64 bytes) consumeBytes(bytes); return; } - + Stopwatch sw_pending; + Int64 wait_times = 0; auto pending_request = pendingRequestMetrics(type); // request cannot be satisfied at this moment, enqueue @@ -140,7 +159,7 @@ void WriteLimiter::request(Int64 bytes) while (!r.granted) { assert(!req_queue.empty()); - + wait_times++; bool timed_out = false; // if this request is in the front of req_queue, // then it is responsible to trigger the refill process. @@ -191,6 +210,7 @@ void WriteLimiter::request(Int64 bytes) } } } + LOG_FMT_TRACE(log, "pending_us {} wait_times {} pending_count {} rate_limit_per_sec {}", sw_pending.elapsed() / 1000, wait_times, req_queue.size(), refill_balance_per_period * 1000 / refill_period_ms); } size_t WriteLimiter::setStop() @@ -298,8 +318,8 @@ ReadLimiter::ReadLimiter( , getIOStatistic(std::move(getIOStatistic_)) , last_stat_bytes(getIOStatistic()) , last_stat_time(now()) - , log(&Poco::Logger::get("ReadLimiter")) , get_io_statistic_period_us(get_io_stat_period_us) + , last_refill_time(std::chrono::system_clock::now()) {} Int64 ReadLimiter::getAvailableBalance() @@ -333,18 +353,22 @@ Int64 ReadLimiter::refreshAvailableBalance() else { Int64 real_alloc_bytes = bytes - last_stat_bytes; - metricAllocBytes(type, real_alloc_bytes); + // `alloc_bytes` is the number of byte that ReadLimiter has allocated. + if (available_balance > 0) + { + auto can_alloc_bytes = std::min(real_alloc_bytes, available_balance); + alloc_bytes += can_alloc_bytes; + metricAllocBytes(type, can_alloc_bytes); + } available_balance -= real_alloc_bytes; - alloc_bytes += real_alloc_bytes; } last_stat_bytes = bytes; last_stat_time = us; return available_balance; } -void ReadLimiter::consumeBytes(Int64 bytes) +void ReadLimiter::consumeBytes([[maybe_unused]] Int64 bytes) { - metricRequestBytes(type, bytes); // Do nothing for read. } @@ -355,10 +379,26 @@ bool ReadLimiter::canGrant([[maybe_unused]] Int64 bytes) void ReadLimiter::refillAndAlloc() { - if (available_balance < refill_balance_per_period) + // `available_balance` of `ReadLimiter` may be overdrawn. + if (available_balance < 0) + { + // Limiter may not be called for a long time. + // During this time, limiter can be refilled at most `max_refill_times` times and covers some overdraft. + auto elapsed_duration = std::chrono::system_clock::now() - last_refill_time; + UInt64 elapsed_ms = std::chrono::duration_cast(elapsed_duration).count(); + // At least refill one time. + Int64 max_refill_times = std::max(elapsed_ms, refill_period_ms) / refill_period_ms; + Int64 max_refill_bytes = max_refill_times * refill_balance_per_period; + Int64 can_alloc_bytes = std::min(-available_balance, max_refill_bytes); + alloc_bytes += can_alloc_bytes; + metricAllocBytes(type, can_alloc_bytes); + available_balance = std::min(available_balance + max_refill_bytes, refill_balance_per_period); + } + else { - available_balance += refill_balance_per_period; + available_balance = refill_balance_per_period; } + last_refill_time = std::chrono::system_clock::now(); assert(!req_queue.empty()); auto * head_req = req_queue.front(); @@ -723,26 +763,13 @@ IOLimitTuner::TuneResult IOLimitTuner::tune() const } auto [max_read_bytes_per_sec, max_write_bytes_per_sec, rw_tuned] = tuneReadWrite(); - LOG_FMT_INFO( - log, - "tuneReadWrite: max_read {} max_write {} rw_tuned {}", - max_read_bytes_per_sec, - max_write_bytes_per_sec, - rw_tuned); auto [max_bg_read_bytes_per_sec, max_fg_read_bytes_per_sec, read_tuned] = tuneRead(max_read_bytes_per_sec); - LOG_FMT_INFO( - log, - "tuneRead: bg_read {} fg_read {} read_tuned {}", - max_bg_read_bytes_per_sec, - max_fg_read_bytes_per_sec, - read_tuned); auto [max_bg_write_bytes_per_sec, max_fg_write_bytes_per_sec, write_tuned] = tuneWrite(max_write_bytes_per_sec); - LOG_FMT_INFO( - log, - "tuneWrite: bg_write {} fg_write {} write_tuned {}", - max_bg_write_bytes_per_sec, - max_fg_write_bytes_per_sec, - write_tuned); + if (rw_tuned || read_tuned || write_tuned) + { + LOG_FMT_INFO(log, "tune_msg: bg_write {} => {} fg_write {} => {} bg_read {} => {} fg_read {} => {}", bg_write_stat != nullptr ? bg_write_stat->maxBytesPerSec() : 0, max_bg_write_bytes_per_sec, fg_write_stat != nullptr ? fg_write_stat->maxBytesPerSec() : 0, max_fg_write_bytes_per_sec, bg_read_stat != nullptr ? bg_read_stat->maxBytesPerSec() : 0, max_bg_read_bytes_per_sec, fg_read_stat != nullptr ? fg_read_stat->maxBytesPerSec() : 0, max_fg_read_bytes_per_sec); + } + return {.max_bg_read_bytes_per_sec = max_bg_read_bytes_per_sec, .max_fg_read_bytes_per_sec = max_fg_read_bytes_per_sec, .read_tuned = read_tuned || rw_tuned, diff --git a/dbms/src/Encryption/RateLimiter.h b/dbms/src/Encryption/RateLimiter.h index f44beeb8ed7..e70626b4192 100644 --- a/dbms/src/Encryption/RateLimiter.h +++ b/dbms/src/Encryption/RateLimiter.h @@ -13,7 +13,7 @@ // limitations under the License. #pragma once - +#include #include #include #include @@ -74,7 +74,10 @@ class WriteLimiter void request(Int64 bytes); // just for test purpose - inline UInt64 getTotalBytesThrough() const { return alloc_bytes; } + inline UInt64 getTotalBytesThrough() const + { + return available_balance < 0 ? alloc_bytes - available_balance : alloc_bytes; + } LimiterStat getStat(); @@ -134,6 +137,7 @@ class WriteLimiter Stopwatch stat_stop_watch; UInt64 alloc_bytes; + LoggerPtr log; }; using WriteLimiterPtr = std::shared_ptr; @@ -185,9 +189,9 @@ class ReadLimiter : public WriteLimiter return std::chrono::time_point_cast(std::chrono::system_clock::now()); } TimePoint last_stat_time; - Poco::Logger * log; Int64 get_io_statistic_period_us; + std::chrono::time_point last_refill_time; }; using ReadLimiterPtr = std::shared_ptr; diff --git a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp index 51984b86460..59f112edb0a 100644 --- a/dbms/src/Encryption/tests/gtest_rate_limiter.cpp +++ b/dbms/src/Encryption/tests/gtest_rate_limiter.cpp @@ -337,23 +337,48 @@ TEST(ReadLimiterTest, LimiterStat) ASSERT_GT(stat.pct(), 100) << stat.toString(); } - static constexpr UInt64 alloc_bytes = 2047; + static constexpr UInt64 total_bytes = 2047; for (int i = 0; i < 11; i++) { request(read_limiter, 1 << i); } - std::this_thread::sleep_for(100ms); read_limiter.refreshAvailableBalance(); stat = read_limiter.getStat(); - ASSERT_EQ(stat.alloc_bytes, alloc_bytes); - ASSERT_GE(stat.elapsed_ms, alloc_bytes / 100 + 1); + ASSERT_EQ(stat.alloc_bytes, total_bytes + read_limiter.getAvailableBalance()); + ASSERT_GE(stat.elapsed_ms, stat.alloc_bytes / 100 + 1); ASSERT_EQ(stat.refill_period_ms, 100ul); ASSERT_EQ(stat.refill_bytes_per_period, 100); ASSERT_EQ(stat.maxBytesPerSec(), 1000); - ASSERT_EQ(stat.avgBytesPerSec(), static_cast(alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString(); - ASSERT_EQ(stat.pct(), static_cast(alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString(); + ASSERT_EQ(stat.avgBytesPerSec(), static_cast(stat.alloc_bytes * 1000 / stat.elapsed_ms)) << stat.toString(); + ASSERT_EQ(stat.pct(), static_cast(stat.alloc_bytes * 1000 / stat.elapsed_ms) * 100 / stat.maxBytesPerSec()) << stat.toString(); +} + +TEST(ReadLimiterTest, ReadMany) +{ + Int64 real_read_bytes{0}; + auto get_read_bytes = [&]() { + return real_read_bytes; + }; + auto request = [&](ReadLimiter & limiter, Int64 bytes) { + limiter.request(bytes); + real_read_bytes += bytes; + }; + + constexpr Int64 bytes_per_sec = 1000; + constexpr UInt64 refill_period_ms = 100; + ReadLimiter read_limiter(get_read_bytes, bytes_per_sec, LimiterType::UNKNOW, refill_period_ms); + ASSERT_EQ(read_limiter.getAvailableBalance(), 100); + request(read_limiter, 1000); + std::this_thread::sleep_for(2ms); + ASSERT_EQ(read_limiter.getAvailableBalance(), -900); + ASSERT_EQ(read_limiter.alloc_bytes, 100); + + std::this_thread::sleep_for(1200ms); + Stopwatch sw; + request(read_limiter, 100); + ASSERT_LE(sw.elapsedMilliseconds(), 1); // Not blocked. } #ifdef __linux__ diff --git a/dbms/src/TestUtils/MockReadLimiter.h b/dbms/src/TestUtils/MockReadLimiter.h index 8acc96371e3..2d3c5cc1a75 100644 --- a/dbms/src/TestUtils/MockReadLimiter.h +++ b/dbms/src/TestUtils/MockReadLimiter.h @@ -32,8 +32,8 @@ class MockReadLimiter final : public ReadLimiter protected: void consumeBytes(Int64 bytes) override { - // Need soft limit here. - WriteLimiter::consumeBytes(bytes); + alloc_bytes += std::min(available_balance, bytes); + available_balance -= bytes; } };