From 6a8f93117b29af4dedd117c1b3583cd7875ae96d Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 3 Apr 2024 15:16:42 +0200 Subject: [PATCH] Fix to main loop timeout calculation leading to a tight loop for a max period of 1 ms When the main thread loop was awakened less than 1 ms before the expiration of a timeout, it was serving with a zero timeout, leading to increased CPU usage until the timeout was reached. Happening since 1.x --- CHANGELOG.md | 6 ++++++ src/rdkafka.c | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2fa163544..5542eb7c9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ librdkafka v2.3.1 is a maintenance release: * Integration tests can be started in KRaft mode and run against any GitHub Kafka branch other than the released versions. * Fix pipeline inclusion of static binaries (#4666) + * Fix to main loop timeout calculation leading to a tight loop for a + max period of 1 ms (#4671). ## Fixes @@ -20,6 +22,10 @@ librdkafka v2.3.1 is a maintenance release: Solved by correctly excluding the binary configured with that library, when targeting a static build. Happening since v2.0.2, with specified platforms, when using static binaries (#4666). + * When the main thread loop was awakened less than 1 ms + before the expiration of a timeout, it was serving with a zero timeout, + leading to increased CPU usage until the timeout was reached. + Happening since 1.x (#4671). diff --git a/src/rdkafka.c b/src/rdkafka.c index 566d2e065d..c8d5078a73 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2120,7 +2120,10 @@ static int rd_kafka_thread_main(void *arg) { RD_KAFKA_CGRP_STATE_TERM)))) { rd_ts_t sleeptime = rd_kafka_timers_next( &rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/); - rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0, + /* Use ceiling division to avoid calling serve with a 0 ms + * timeout in a tight loop until 1 ms has passed. */ + int timeout_ms = (sleeptime + 999) / 1000; + rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, NULL, NULL); if (rk->rk_cgrp) /* FIXME: move to timer-triggered */ rd_kafka_cgrp_serve(rk->rk_cgrp);