diff --git a/CHANGELOG.md b/CHANGELOG.md index bc42c3f021..0780da8730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ librdkafka v2.3.0 is a feature release: would cause an offset reset. This isn't what's expected or what the Java implementation does. Solved by retrying even in case of permanent errors (#4447). + * Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with consume_cb (#4431). diff --git a/tests/0089-max_poll_interval.c b/tests/0089-max_poll_interval.c index e47ce1510b..3c491e8431 100644 --- a/tests/0089-max_poll_interval.c +++ b/tests/0089-max_poll_interval.c @@ -453,25 +453,23 @@ static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) { TEST_SAY("Consume callback\n"); } -static void do_test_max_poll_reset_with_consumer_cb(void ) { +/** + * @brief Test that max.poll.interval.ms is reset when + * rd_kafka_poll is called with consume_cb + */ +static void do_test_max_poll_reset_with_consumer_cb(void) { const char *topic = test_mk_topic_name("0089_max_poll_interval", 1); rd_kafka_conf_t *conf; char groupid[64]; - rd_kafka_t *rk = NULL; - rd_kafka_queue_t *consumer_queue = NULL; - rd_kafka_queue_t *forwarder_queue = NULL; - rd_kafka_event_t *event = NULL; - rd_kafka_queue_t *polling_queue = NULL; + rd_kafka_t *rk = NULL; SUB_TEST(); test_create_topic(NULL, topic, 1, 1); uint64_t testid = test_id_generate(); - test_produce_msgs_easy(topic, testid, -1, 100); - test_str_id_generate(groupid, sizeof(groupid)); test_conf_init(&conf, NULL, 60); test_conf_set(conf, "session.timeout.ms", "10000"); @@ -486,10 +484,11 @@ static void do_test_max_poll_reset_with_consumer_cb(void ) { TEST_SAY("Subscribed to %s and sleeping for 5 s\n", topic); rd_sleep(5); rd_kafka_poll(rk, 10); - TEST_SAY("Polled and sleeping again for 6s. Max poll should be reset\n"); + TEST_SAY( + "Polled and sleeping again for 6s. Max poll should be reset\n"); rd_sleep(6); - // Poll should work + /* Poll should work */ rd_kafka_poll(rk, 10); rd_kafka_event_destroy(event); test_consumer_close(rk); @@ -497,11 +496,11 @@ static void do_test_max_poll_reset_with_consumer_cb(void ) { } int main_0089_max_poll_interval(int argc, char **argv) { - do_test(); - do_test_with_log_queue(); - do_test_rejoin_after_interval_expire(rd_false, rd_false); - do_test_rejoin_after_interval_expire(rd_true, rd_false); - do_test_rejoin_after_interval_expire(rd_false, rd_true); - do_test_max_poll_reset_with_consumer_cb(); + do_test(); + do_test_with_log_queue(); + do_test_rejoin_after_interval_expire(rd_false, rd_false); + do_test_rejoin_after_interval_expire(rd_true, rd_false); + do_test_rejoin_after_interval_expire(rd_false, rd_true); + do_test_max_poll_reset_with_consumer_cb(); return 0; }