From 704c5969f60defc5f4d1230d45a7c6b430589de9 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 4 Dec 2023 15:32:52 +0100 Subject: [PATCH] Fixes #4527 --- CHANGELOG.md | 19 +++++++++ src/rdkafka_cgrp.c | 12 ++++++ tests/0113-cooperative_rebalance.cpp | 58 +++++++++++++++++----------- 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea7206ceac..d4a46f35cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,22 @@ +# librdkafka v2.3.1 (can change) + +librdkafka v2.3.1 is a maintenance release: + + * Fix hang in cooperative consumer mode if an assignment is processed + while closing the consumer (#). + + +## Fixes + +### Consumer fixes + + * While using the cooperative assignor, given an assignment is received while closing the consumer + it's possible that it gets stuck in state WAIT_ASSIGN_CALL, while the method is converted to + a full unassign. Solved by changing state from WAIT_ASSIGN_CALL to WAIT_UNASSIGN_CALL + while doing this conversion (#). + + + # librdkafka v2.3.0 librdkafka v2.3.0 is a feature release: diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index eb953bb56b..e41f7b3eb5 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -4831,8 +4831,20 @@ static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg, rko->rko_u.assign.partitions); rko->rko_u.assign.partitions = NULL; } + if (rkcg->rkcg_rebalance_incr_assignment) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_rebalance_incr_assignment); + rkcg->rkcg_rebalance_incr_assignment = NULL; + } + rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN; + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) { + rd_kafka_cgrp_set_join_state( + rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL); + } + } else if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && !(rko->rko_u.assign.method == diff --git a/tests/0113-cooperative_rebalance.cpp b/tests/0113-cooperative_rebalance.cpp index d87063f9d8..833c7dac9f 100644 --- a/tests/0113-cooperative_rebalance.cpp +++ b/tests/0113-cooperative_rebalance.cpp @@ -2200,8 +2200,14 @@ static void t_max_poll_interval_exceeded(int variation) { Test::subscribe(c1, topic_name_1); Test::subscribe(c2, topic_name_1); - bool done = false; - bool both_have_been_assigned = false; + bool done = false; + bool both_have_been_assigned = false; + int expected_cb1_assign_call_cnt = 1; + int expected_cb2_assign_call_cnt = 2; + int expected_cb1_revoke_call_cnt = 1; + int expected_cb2_revoke_call_cnt = 1; + int expected_cb1_lost_call_cnt = 1; + while (!done) { if (!both_have_been_assigned) Test::poll_once(c1, 500); @@ -2233,37 +2239,43 @@ static void t_max_poll_interval_exceeded(int variation) { 500); /* Eat the max poll interval exceeded error message */ Test::poll_once(c1, 500); /* Trigger the rebalance_cb with lost partitions */ - if (rebalance_cb1.lost_call_cnt != 1) - Test::Fail( - tostr() << "Expected consumer 1 lost revoke count to be 1, not: " - << rebalance_cb1.lost_call_cnt); + if (rebalance_cb1.lost_call_cnt != expected_cb1_lost_call_cnt) + Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be " + << expected_cb1_lost_call_cnt + << ", not: " << rebalance_cb1.lost_call_cnt); } if (variation == 3) { /* Last poll will cause a rejoin, wait that the rejoin happens. */ rd_sleep(5); + expected_cb2_revoke_call_cnt++; } c1->close(); c2->close(); - if (rebalance_cb1.lost_call_cnt != 1) - Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be 1, not: " - << rebalance_cb1.lost_call_cnt); - - if (rebalance_cb1.assign_call_cnt != 1) - Test::Fail(tostr() << "Expected consumer 1 assign count to be 1, not: " - << rebalance_cb1.assign_call_cnt); - if (rebalance_cb2.assign_call_cnt != 2) - Test::Fail(tostr() << "Expected consumer 1 assign count to be 2, not: " - << rebalance_cb1.assign_call_cnt); - - if (rebalance_cb1.revoke_call_cnt != 1) - Test::Fail(tostr() << "Expected consumer 1 revoke count to be 1, not: " - << rebalance_cb1.revoke_call_cnt); - if (rebalance_cb2.revoke_call_cnt != 1) - Test::Fail(tostr() << "Expected consumer 2 revoke count to be 1, not: " - << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb1.lost_call_cnt != expected_cb1_lost_call_cnt) + Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be " + << expected_cb1_lost_call_cnt + << ", not: " << rebalance_cb1.lost_call_cnt); + + if (rebalance_cb1.nonempty_assign_call_cnt != expected_cb1_assign_call_cnt) + Test::Fail(tostr() << "Expected consumer 1 non-empty assign count to be " + << expected_cb1_assign_call_cnt + << ", not: " << rebalance_cb1.nonempty_assign_call_cnt); + if (rebalance_cb2.nonempty_assign_call_cnt != expected_cb2_assign_call_cnt) + Test::Fail(tostr() << "Expected consumer 2 non-empty assign count to be " + << expected_cb2_assign_call_cnt + << ", not: " << rebalance_cb2.nonempty_assign_call_cnt); + + if (rebalance_cb1.revoke_call_cnt != expected_cb1_revoke_call_cnt) + Test::Fail(tostr() << "Expected consumer 1 revoke count to be " + << expected_cb1_revoke_call_cnt + << ", not: " << rebalance_cb1.revoke_call_cnt); + if (rebalance_cb2.revoke_call_cnt != expected_cb2_revoke_call_cnt) + Test::Fail(tostr() << "Expected consumer 2 revoke count to be " + << expected_cb2_revoke_call_cnt + << ", not: " << rebalance_cb2.revoke_call_cnt); delete c1; delete c2;