Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: calculate the io throughput in background in ReadLimiter #5415

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 53 additions & 66 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,47 +289,31 @@ void WriteLimiter::updateMaxBytesPerSec(Int64 max_bytes_per_sec)
}

ReadLimiter::ReadLimiter(
std::function<Int64()> getIOStatistic_,
std::function<Int64()> get_read_bytes_,
Int64 rate_limit_per_sec_,
LimiterType type_,
Int64 get_io_stat_period_us,
UInt64 refill_period_ms_)
: WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_)
, getIOStatistic(std::move(getIOStatistic_))
, last_stat_bytes(getIOStatistic())
, last_stat_time(now())
, get_read_bytes(std::move(get_read_bytes_))
, last_stat_bytes(get_read_bytes())
, log(Logger::get("ReadLimiter"))
, get_io_statistic_period_us(get_io_stat_period_us)
{}

Int64 ReadLimiter::getAvailableBalance()
{
TimePoint us = now();
// Not call getIOStatisctics() every time for performance.
// If the clock back, elapsed_us could be negative.
Int64 elapsed_us = std::chrono::duration_cast<std::chrono::microseconds>(us - last_stat_time).count();
if (get_io_statistic_period_us != 0 && elapsed_us < get_io_statistic_period_us)
{
return available_balance;
}

return refreshAvailableBalance();
}

Int64 ReadLimiter::refreshAvailableBalance()
{
TimePoint us = now();
Int64 bytes = getIOStatistic();
if (bytes < last_stat_bytes)
Int64 bytes = get_read_bytes();
if (unlikely(bytes < last_stat_bytes))
{
LOG_FMT_WARNING(
log,
"last_stat {}:{} current_stat {}:{}",
last_stat_time.time_since_epoch().count(),
"last_stat: {} current_stat: {}",
last_stat_bytes,
us.time_since_epoch().count(),
bytes);
}
else if (likely(bytes == last_stat_bytes))
{
return available_balance;
}
else
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
{
Int64 real_alloc_bytes = bytes - last_stat_bytes;
Expand All @@ -338,7 +322,6 @@ Int64 ReadLimiter::refreshAvailableBalance()
alloc_bytes += real_alloc_bytes;
}
last_stat_bytes = bytes;
last_stat_time = us;
return available_balance;
}

Expand Down Expand Up @@ -381,17 +364,18 @@ void ReadLimiter::refillAndAlloc()
}
}

IORateLimiter::IORateLimiter()
IORateLimiter::IORateLimiter(UInt64 update_read_info_period_ms_)
: log(Logger::get("IORateLimiter"))
, stop(false)
, update_read_info_period_ms(update_read_info_period_ms_)
{}

IORateLimiter::~IORateLimiter()
{
stop.store(true, std::memory_order_relaxed);
if (auto_tune_thread.joinable())
if (auto_tune_and_get_read_info_thread.joinable())
{
auto_tune_thread.join();
auto_tune_and_get_read_info_thread.join();
}
}

Expand All @@ -409,13 +393,13 @@ extern thread_local bool is_background_thread;

WriteLimiterPtr IORateLimiter::getWriteLimiter()
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
return is_background_thread ? bg_write_limiter : fg_write_limiter;
}

ReadLimiterPtr IORateLimiter::getReadLimiter()
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
return is_background_thread ? bg_read_limiter : fg_read_limiter;
}

Expand All @@ -426,7 +410,7 @@ void IORateLimiter::updateConfig(Poco::Util::AbstractConfiguration & config_)
{
return;
}
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
updateReadLimiter(io_config.getBgReadMaxBytesPerSec(), io_config.getFgReadMaxBytesPerSec());
updateWriteLimiter(io_config.getBgWriteMaxBytesPerSec(), io_config.getFgWriteMaxBytesPerSec());
}
Expand Down Expand Up @@ -455,11 +439,10 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes)
{
LOG_FMT_INFO(log, "updateReadLimiter: bg_bytes {} fg_bytes {}", bg_bytes, fg_bytes);
auto get_bg_read_io_statistic = [&]() {
return getCurrentIOInfo().bg_read_bytes;
return read_info.bg_read_bytes.load(std::memory_order_relaxed);
};
auto get_fg_read_io_statistic = [&]() {
auto io_info = getCurrentIOInfo();
return std::max(0, io_info.total_read_bytes - io_info.bg_read_bytes);
return read_info.fg_read_bytes.load(std::memory_order_relaxed);
};

if (bg_bytes == 0)
Expand Down Expand Up @@ -526,7 +509,7 @@ void IORateLimiter::setBackgroundThreadIds(std::vector<pid_t> thread_ids)
LOG_FMT_INFO(log, "bg_thread_ids {} => {}", bg_thread_ids.size(), bg_thread_ids);
}

std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fname [[maybe_unused]])
Int64 IORateLimiter::getReadBytes(const std::string & fname [[maybe_unused]])
{
#if __linux__
std::ifstream ifs(fname);
Expand All @@ -538,7 +521,6 @@ std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fna
}
std::string s;
Int64 read_bytes = -1;
Int64 write_bytes = -1;
while (std::getline(ifs, s))
{
if (s.empty())
Expand All @@ -557,49 +539,43 @@ std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fna
boost::algorithm::trim(values[1]);
read_bytes = std::stoll(values[1]);
}
else if (values[0] == "write_bytes")
{
boost::algorithm::trim(values[1]);
write_bytes = std::stoll(values[1]);
}
}
if (read_bytes == -1 || write_bytes == -1)
if (read_bytes == -1)
{
auto msg = fmt::format("read_bytes: {} write_bytes: {} Invalid result.", read_bytes, write_bytes);
auto msg = fmt::format("read_bytes: {}. Invalid result.", read_bytes);
LOG_ERROR(log, msg);
throw Exception(msg, ErrorCodes::UNKNOWN_EXCEPTION);
}
return {read_bytes, write_bytes};
return read_bytes;
#else
return {0, 0};
return 0;
#endif
}

IORateLimiter::IOInfo IORateLimiter::getCurrentIOInfo()
void IORateLimiter::getCurrentIOInfo()
{
static const pid_t pid = getpid();
IOInfo io_info;

// Read I/O info of each background threads.
// Read read info of each background threads.
Int64 bg_read_bytes_tmp{0};
for (pid_t tid : bg_thread_ids)
{
const std::string thread_io_fname = fmt::format("/proc/{}/task/{}/io", pid, tid);
Int64 read_bytes, write_bytes;
std::tie(read_bytes, write_bytes) = getReadWriteBytes(thread_io_fname);
io_info.bg_read_bytes += read_bytes;
io_info.bg_write_bytes += write_bytes;
Int64 read_bytes;
read_bytes = getReadBytes(thread_io_fname);
bg_read_bytes_tmp += read_bytes;
}
read_info.bg_read_bytes.store(bg_read_bytes_tmp, std::memory_order_relaxed);

// Read I/O info of this process.
// Read read info of this process.
static const std::string proc_io_fname = fmt::format("/proc/{}/io", pid);
std::tie(io_info.total_read_bytes, io_info.total_write_bytes) = getReadWriteBytes(proc_io_fname);
io_info.update_time = std::chrono::system_clock::now();
return io_info;
Int64 fg_read_bytes_tmp{getReadBytes(proc_io_fname) - bg_read_bytes_tmp};
read_info.fg_read_bytes.store(std::max(0, fg_read_bytes_tmp), std::memory_order_relaxed);
}

void IORateLimiter::setStop()
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
if (bg_write_limiter != nullptr)
{
auto sz = bg_write_limiter->setStop();
Expand All @@ -624,17 +600,28 @@ void IORateLimiter::setStop()

void IORateLimiter::runAutoTune()
{
auto auto_tune_worker = [&]() {
auto auto_tune_and_get_read_info_worker = [&]() {
using time_point = std::chrono::time_point<std::chrono::system_clock>;
using clock = std::chrono::system_clock;
time_point auto_tune_time = clock::now();
time_point update_read_info_time = auto_tune_time;
while (!stop.load(std::memory_order_relaxed))
{
::sleep(io_config.auto_tune_sec > 0 ? io_config.auto_tune_sec : 1);
if (io_config.auto_tune_sec > 0)
std::this_thread::sleep_for(std::chrono::milliseconds(update_read_info_period_ms));
auto now_time_point = clock::now();
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
if ((io_config.auto_tune_sec > 0) && (now_time_point - auto_tune_time >= std::chrono::seconds(io_config.auto_tune_sec)))
{
autoTune();
auto_tune_time = now_time_point;
}
if ((bg_read_limiter || fg_read_limiter) && likely(now_time_point - update_read_info_time >= std::chrono::milliseconds(update_read_info_period_ms)))
{
getCurrentIOInfo();
update_read_info_time = now_time_point;
}
}
};
auto_tune_thread = std::thread(auto_tune_worker);
auto_tune_and_get_read_info_thread = std::thread(auto_tune_and_get_read_info_worker);
}

std::unique_ptr<IOLimitTuner> IORateLimiter::createIOLimitTuner()
Expand All @@ -643,7 +630,7 @@ std::unique_ptr<IOLimitTuner> IORateLimiter::createIOLimitTuner()
ReadLimiterPtr bg_read, fg_read;
StorageIORateLimitConfig t_io_config;
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
bg_write = bg_write_limiter;
fg_write = fg_write_limiter;
bg_read = bg_read_limiter;
Expand All @@ -666,12 +653,12 @@ void IORateLimiter::autoTune()
auto tune_result = tuner->tune();
if (tune_result.read_tuned)
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
updateReadLimiter(tune_result.max_bg_read_bytes_per_sec, tune_result.max_fg_read_bytes_per_sec);
}
if (tune_result.write_tuned)
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
updateWriteLimiter(tune_result.max_bg_write_bytes_per_sec, tune_result.max_fg_write_bytes_per_sec);
}
}
Expand Down
Loading