Skip to content

Commit

Permalink
Fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Sep 29, 2023
1 parent ab8c71e commit 4796e42
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ librdkafka v2.3.0 is a feature release:
would cause an offset reset.
This isn't what's expected or what the Java implementation does.
Solved by retrying even in case of permanent errors (#4447).
* Fix to ensure max.poll.interval.ms is reset when rd_kafka_poll is called with consume_cb (#4431).



Expand Down
31 changes: 15 additions & 16 deletions tests/0089-max_poll_interval.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,25 +453,23 @@ static void consume_cb(rd_kafka_message_t *rkmessage, void *opaque) {
TEST_SAY("Consume callback\n");
}

static void do_test_max_poll_reset_with_consumer_cb(void ) {
/**
* @brief Test that max.poll.interval.ms is reset when
* rd_kafka_poll is called with consume_cb
*/
static void do_test_max_poll_reset_with_consumer_cb(void) {
const char *topic = test_mk_topic_name("0089_max_poll_interval", 1);
rd_kafka_conf_t *conf;
char groupid[64];
rd_kafka_t *rk = NULL;
rd_kafka_queue_t *consumer_queue = NULL;
rd_kafka_queue_t *forwarder_queue = NULL;
rd_kafka_event_t *event = NULL;
rd_kafka_queue_t *polling_queue = NULL;
rd_kafka_t *rk = NULL;

SUB_TEST();

test_create_topic(NULL, topic, 1, 1);
uint64_t testid = test_id_generate();


test_produce_msgs_easy(topic, testid, -1, 100);


test_str_id_generate(groupid, sizeof(groupid));
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "session.timeout.ms", "10000");
Expand All @@ -486,22 +484,23 @@ static void do_test_max_poll_reset_with_consumer_cb(void ) {
TEST_SAY("Subscribed to %s and sleeping for 5 s\n", topic);
rd_sleep(5);
rd_kafka_poll(rk, 10);
TEST_SAY("Polled and sleeping again for 6s. Max poll should be reset\n");
TEST_SAY(
"Polled and sleeping again for 6s. Max poll should be reset\n");
rd_sleep(6);

// Poll should work
/* Poll should work */
rd_kafka_poll(rk, 10);
rd_kafka_event_destroy(event);
test_consumer_close(rk);
rd_kafka_destroy(rk);
}

int main_0089_max_poll_interval(int argc, char **argv) {
do_test();
do_test_with_log_queue();
do_test_rejoin_after_interval_expire(rd_false, rd_false);
do_test_rejoin_after_interval_expire(rd_true, rd_false);
do_test_rejoin_after_interval_expire(rd_false, rd_true);
do_test_max_poll_reset_with_consumer_cb();
do_test();
do_test_with_log_queue();
do_test_rejoin_after_interval_expire(rd_false, rd_false);
do_test_rejoin_after_interval_expire(rd_true, rd_false);
do_test_rejoin_after_interval_expire(rd_false, rd_true);
do_test_max_poll_reset_with_consumer_cb();
return 0;
}

0 comments on commit 4796e42

Please sign in to comment.