From f4a50cd688264e3006af7638d95b7651f445eda5 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 13 Sep 2023 16:26:24 +0530 Subject: [PATCH 1/5] Move can_q_contain_fetched_msgs inside q_server --- src/rdkafka.c | 26 -------------------------- src/rdkafka_queue.c | 30 +++++++++++++++++++++++++++++- tests/0089-max_poll_interval.c | 31 ++++++++++++++++++++++--------- 3 files changed, 51 insertions(+), 36 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index f460334cd6..6401e3520e 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -4005,36 +4005,19 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) { int r; - const rd_bool_t can_q_contain_fetched_msgs = - rd_kafka_q_can_contain_fetched_msgs(rk->rk_rep, RD_DO_LOCK); - - if (timeout_ms && can_q_contain_fetched_msgs) - rd_kafka_app_poll_blocking(rk); r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); - - if (can_q_contain_fetched_msgs) - rd_kafka_app_polled(rk); - return r; } rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) { rd_kafka_op_t *rko; - const rd_bool_t can_q_contain_fetched_msgs = - rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK); - - - if (timeout_ms && can_q_contain_fetched_msgs) - rd_kafka_app_poll_blocking(rkqu->rkqu_rk); rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL); - if (can_q_contain_fetched_msgs) - rd_kafka_app_polled(rkqu->rkqu_rk); if (!rko) return NULL; @@ -4044,18 +4027,9 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) { int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) { int r; - const rd_bool_t can_q_contain_fetched_msgs = - rd_kafka_q_can_contain_fetched_msgs(rkqu->rkqu_q, RD_DO_LOCK); - - if (timeout_ms && can_q_contain_fetched_msgs) - rd_kafka_app_poll_blocking(rkqu->rkqu_rk); r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL); - - if (can_q_contain_fetched_msgs) - rd_kafka_app_polled(rkqu->rkqu_rk); - return r; } diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index b8749123f4..cb426daec8 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -399,10 +399,16 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, rd_kafka_yield_thread = 0; if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK); + struct timespec timeout_tspec; rd_timeout_init_timespec_us(&timeout_tspec, timeout_us); + if (timeout_us && can_q_contain_fetched_msgs) + rd_kafka_app_poll_blocking(rkq->rkq_rk); + while (1) { rd_kafka_op_res_t res; /* Keep track of current lock status to avoid @@ -440,15 +446,24 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, goto retry; /* Next op */ } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD)) { + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled( + rkq->rkq_rk); /* Callback yielded, unroll */ return NULL; - } else + } else { + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled( + rkq->rkq_rk); break; /* Proper op, handle below. */ + } } if (unlikely(rd_kafka_q_check_yield(rkq))) { if (is_locked) mtx_unlock(&rkq->rkq_lock); + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rkq->rkq_rk); return NULL; } @@ -458,6 +473,8 @@ rd_kafka_op_t *rd_kafka_q_pop_serve(rd_kafka_q_t *rkq, if (cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, &timeout_tspec) != thrd_success) { mtx_unlock(&rkq->rkq_lock); + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rkq->rkq_rk); return NULL; } } @@ -520,8 +537,14 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, return ret; } + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK); + rd_timeout_init_timespec(&timeout_tspec, timeout_ms); + if (timeout_ms && can_q_contain_fetched_msgs) + rd_kafka_app_poll_blocking(rk); + /* Wait for op */ while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && !rd_kafka_q_check_yield(rkq) && @@ -533,6 +556,8 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, if (!rko) { mtx_unlock(&rkq->rkq_lock); + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rk); return 0; } @@ -567,6 +592,9 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, } } + if (can_q_contain_fetched_msgs) + rd_kafka_app_polled(rk); + rd_kafka_q_destroy_owner(&localq); return cnt; diff --git a/tests/0089-max_poll_interval.c b/tests/0089-max_poll_interval.c index 660e7ce62c..1f22a6ec1b 100644 --- a/tests/0089-max_poll_interval.c +++ b/tests/0089-max_poll_interval.c @@ -360,19 +360,25 @@ static void do_test_with_log_queue(void) { * should suffice. * We test with the result of rd_kafka_queue_get_consumer, and an arbitrary * queue that is forwarded to by the result of rd_kafka_queue_get_consumer. + * We also test with an arbitrary queue that is forwarded to the the result of + * rd_kafka_queue_get_consumer. */ static void -do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) { +do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q, + rd_bool_t forward_to_consumer_q) { const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); rd_kafka_conf_t *conf; char groupid[64]; - rd_kafka_t *rk = NULL; - rd_kafka_queue_t *consumer_queue = NULL; - rd_kafka_event_t *event = NULL; - rd_kafka_queue_t *polling_queue = NULL; + rd_kafka_t *rk = NULL; + rd_kafka_queue_t *consumer_queue = NULL; + rd_kafka_queue_t *forwarder_queue = NULL; + rd_kafka_event_t *event = NULL; + rd_kafka_queue_t *polling_queue = NULL; - SUB_TEST("Testing with forward_to_another_q = %d", - forward_to_another_q); + SUB_TEST( + "Testing with forward_to_another_q = %d, forward_to_consumer_q = " + "%d", + forward_to_another_q, forward_to_consumer_q); test_create_topic(NULL, topic, 1, 1); @@ -393,6 +399,10 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) { if (forward_to_another_q) { polling_queue = rd_kafka_queue_new(rk); rd_kafka_queue_forward(consumer_queue, polling_queue); + } else if (forward_to_consumer_q) { + forwarder_queue = rd_kafka_queue_new(rk); + rd_kafka_queue_forward(forwarder_queue, consumer_queue); + polling_queue = forwarder_queue; } else polling_queue = consumer_queue; @@ -430,6 +440,8 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) { if (forward_to_another_q) rd_kafka_queue_destroy(polling_queue); + if (forward_to_consumer_q) + rd_kafka_queue_destroy(forwarder_queue); rd_kafka_queue_destroy(consumer_queue); test_consumer_close(rk); rd_kafka_destroy(rk); @@ -440,7 +452,8 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q) { int main_0089_max_poll_interval(int argc, char **argv) { do_test(); do_test_with_log_queue(); - do_test_rejoin_after_interval_expire(rd_false); - do_test_rejoin_after_interval_expire(rd_true); + do_test_rejoin_after_interval_expire(rd_false, rd_false); + do_test_rejoin_after_interval_expire(rd_true, rd_false); + do_test_rejoin_after_interval_expire(rd_false, rd_true); return 0; } From 92aeab4d919cce27728da2c7b5f80c751dc59784 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 29 Sep 2023 17:37:20 +0530 Subject: [PATCH 2/5] Add do_test_max_poll_reset_with_consumer_cb --- tests/0089-max_poll_interval.c | 58 +++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/tests/0089-max_poll_interval.c b/tests/0089-max_poll_interval.c index 1f22a6ec1b..e47ce1510b 100644 --- a/tests/0089-max_poll_interval.c +++ b/tests/0089-max_poll_interval.c @@ -449,11 +449,59 @@ do_test_rejoin_after_interval_expire(rd_bool_t forward_to_another_q, SUB_TEST_PASS(); } +static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) { + TEST_SAY("Consume callback\n"); +} + +static void do_test_max_poll_reset_with_consumer_cb(void ) { + const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); + rd_kafka_conf_t *conf; + char groupid[64]; + rd_kafka_t *rk = NULL; + rd_kafka_queue_t *consumer_queue = NULL; + rd_kafka_queue_t *forwarder_queue = NULL; + rd_kafka_event_t *event = NULL; + rd_kafka_queue_t *polling_queue = NULL; + + SUB_TEST(); + + test_create_topic(NULL, topic, 1, 1); + uint64_t testid = test_id_generate(); + + + test_produce_msgs_easy(topic, testid, -1, 100); + + + test_str_id_generate(groupid, sizeof(groupid)); + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "session.timeout.ms", "10000"); + test_conf_set(conf, "max.poll.interval.ms", "10000" /*10s*/); + test_conf_set(conf, "partition.assignment.strategy", "range"); + rd_kafka_conf_set_consume_cb(conf, consume_cb); + + rk = test_create_consumer(groupid, NULL, conf, NULL); + rd_kafka_poll_set_consumer(rk); + + test_consumer_subscribe(rk, topic); + TEST_SAY("Subscribed to %s and sleeping for 5 s\n", topic); + rd_sleep(5); + rd_kafka_poll(rk, 10); + TEST_SAY("Polled and sleeping again for 6s. Max poll should be reset\n"); + rd_sleep(6); + + // Poll should work + rd_kafka_poll(rk, 10); + rd_kafka_event_destroy(event); + test_consumer_close(rk); + rd_kafka_destroy(rk); +} + int main_0089_max_poll_interval(int argc, char **argv) { - do_test(); - do_test_with_log_queue(); - do_test_rejoin_after_interval_expire(rd_false, rd_false); - do_test_rejoin_after_interval_expire(rd_true, rd_false); - do_test_rejoin_after_interval_expire(rd_false, rd_true); + do_test(); + do_test_with_log_queue(); + do_test_rejoin_after_interval_expire(rd_false, rd_false); + do_test_rejoin_after_interval_expire(rd_true, rd_false); + do_test_rejoin_after_interval_expire(rd_false, rd_true); + do_test_max_poll_reset_with_consumer_cb(); return 0; } From ab8c71e3f54a1756411c8be54fc46fa38f3c6ad8 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 29 Sep 2023 17:43:58 +0530 Subject: [PATCH 3/5] Add CHANGELOG --- CHANGELOG.md | 1 + src/rdkafka_queue.c | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9084312fad..bc42c3f021 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ librdkafka v2.3.0 is a feature release: * Fix for stored offsets not being committed if they lacked the leader epoch (#4442). * Fix to ensure permanent errors during offset validation continue being retried and don't cause an offset reset (#4447). + * Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with consume_cb (#4431). ## Fixes diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index cb426daec8..3e30379558 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -520,6 +520,8 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, rd_kafka_q_t *fwdq; int cnt = 0; struct timespec timeout_tspec; + const rd_bool_t can_q_contain_fetched_msgs = + rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK); rd_dassert(cb_type); @@ -537,8 +539,6 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq, return ret; } - const rd_bool_t can_q_contain_fetched_msgs = - rd_kafka_q_can_contain_fetched_msgs(rkq, RD_DONT_LOCK); rd_timeout_init_timespec(&timeout_tspec, timeout_ms); From 69e0942af5ae71a4e8476e17c3125daea10da58d Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Fri, 29 Sep 2023 12:16:58 +0000 Subject: [PATCH 4/5] Fix style --- CHANGELOG.md | 1 + tests/0089-max_poll_interval.c | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc42c3f021..0780da8730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ librdkafka v2.3.0 is a feature release: would cause an offset reset. This isn't what's expected or what the Java implementation does. Solved by retrying even in case of permanent errors (#4447). + * Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with consume_cb (#4431). diff --git a/tests/0089-max_poll_interval.c b/tests/0089-max_poll_interval.c index e47ce1510b..e5b0d1fb2a 100644 --- a/tests/0089-max_poll_interval.c +++ b/tests/0089-max_poll_interval.c @@ -453,25 +453,24 @@ static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) { TEST_SAY("Consume callback\n"); } -static void do_test_max_poll_reset_with_consumer_cb(void ) { +/** + * @brief Test that max.poll.interval.ms is reset when + * rd_kafka_poll is called with consume_cb. + * Issue reported in https://github.com/confluentinc/librdkafka/issues/4421 + */ +static void do_test_max_poll_reset_with_consumer_cb(void) { const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); rd_kafka_conf_t *conf; char groupid[64]; - rd_kafka_t *rk = NULL; - rd_kafka_queue_t *consumer_queue = NULL; - rd_kafka_queue_t *forwarder_queue = NULL; - rd_kafka_event_t *event = NULL; - rd_kafka_queue_t *polling_queue = NULL; + rd_kafka_t *rk = NULL; SUB_TEST(); test_create_topic(NULL, topic, 1, 1); uint64_t testid = test_id_generate(); - test_produce_msgs_easy(topic, testid, -1, 100); - test_str_id_generate(groupid, sizeof(groupid)); test_conf_init(&conf, NULL, 60); test_conf_set(conf, "session.timeout.ms", "10000"); @@ -486,10 +485,11 @@ static void do_test_max_poll_reset_with_consumer_cb(void ) { TEST_SAY("Subscribed to %s and sleeping for 5 s\n", topic); rd_sleep(5); rd_kafka_poll(rk, 10); - TEST_SAY("Polled and sleeping again for 6s. Max poll should be reset\n"); + TEST_SAY( + "Polled and sleeping again for 6s. Max poll should be reset\n"); rd_sleep(6); - // Poll should work + /* Poll should work */ rd_kafka_poll(rk, 10); rd_kafka_event_destroy(event); test_consumer_close(rk); @@ -497,11 +497,11 @@ static void do_test_max_poll_reset_with_consumer_cb(void ) { } int main_0089_max_poll_interval(int argc, char **argv) { - do_test(); - do_test_with_log_queue(); - do_test_rejoin_after_interval_expire(rd_false, rd_false); - do_test_rejoin_after_interval_expire(rd_true, rd_false); - do_test_rejoin_after_interval_expire(rd_false, rd_true); - do_test_max_poll_reset_with_consumer_cb(); + do_test(); + do_test_with_log_queue(); + do_test_rejoin_after_interval_expire(rd_false, rd_false); + do_test_rejoin_after_interval_expire(rd_true, rd_false); + do_test_rejoin_after_interval_expire(rd_false, rd_true); + do_test_max_poll_reset_with_consumer_cb(); return 0; } From e69087d8e07ec536be0e5d5a691edc3817b7fb94 Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 29 Sep 2023 19:25:22 +0530 Subject: [PATCH 5/5] Fix compilation issue --- CHANGELOG.md | 10 ++++++++-- tests/0089-max_poll_interval.c | 3 +-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0780da8730..ca3446f504 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,8 @@ librdkafka v2.3.0 is a feature release: * Fix for stored offsets not being committed if they lacked the leader epoch (#4442). * Fix to ensure permanent errors during offset validation continue being retried and don't cause an offset reset (#4447). - * Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with consume_cb (#4431). + * Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with + consume_cb (#4431). ## Fixes @@ -52,7 +53,12 @@ librdkafka v2.3.0 is a feature release: would cause an offset reset. This isn't what's expected or what the Java implementation does. Solved by retrying even in case of permanent errors (#4447). - * Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with consume_cb (#4431). + * If using `rd_kafka_poll_set_consumer`, along with a consume callback, and then + calling `rd_kafka_poll` to service the callbacks, would not reset + `max.poll.interval.ms.` This was because we were only checking `rk_rep` for + consumer messages, while the method to service the queue internally also + services the queue forwarded to from `rk_rep`, which is `rkcg_q`. + Solved by moving the `max.poll.interval.ms` check into `rd_kafka_q_serve` (#4431). diff --git a/tests/0089-max_poll_interval.c b/tests/0089-max_poll_interval.c index e5b0d1fb2a..2089af9907 100644 --- a/tests/0089-max_poll_interval.c +++ b/tests/0089-max_poll_interval.c @@ -456,7 +456,7 @@ static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) { /** * @brief Test that max.poll.interval.ms is reset when * rd_kafka_poll is called with consume_cb. - * Issue reported in https://github.com/confluentinc/librdkafka/issues/4421 + * See issue #4421. */ static void do_test_max_poll_reset_with_consumer_cb(void) { const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); @@ -491,7 +491,6 @@ static void do_test_max_poll_reset_with_consumer_cb(void) { /* Poll should work */ rd_kafka_poll(rk, 10); - rd_kafka_event_destroy(event); test_consumer_close(rk); rd_kafka_destroy(rk); }