Skip to content

Commit

Permalink
Fixes #4527
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Dec 4, 2023
1 parent ae5ee7b commit 704c596
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 23 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# librdkafka v2.3.1 (can change)

librdkafka v2.3.1 is a maintenance release:

* Fix hang in cooperative consumer mode if an assignment is processed
while closing the consumer (#).


## Fixes

### Consumer fixes

* While using the cooperative assignor, given an assignment is received while closing the consumer
it's possible that it gets stuck in state WAIT_ASSIGN_CALL, while the method is converted to
a full unassign. Solved by changing state from WAIT_ASSIGN_CALL to WAIT_UNASSIGN_CALL
while doing this conversion (#).



# librdkafka v2.3.0

librdkafka v2.3.0 is a feature release:
Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -4831,8 +4831,20 @@ static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg,
rko->rko_u.assign.partitions);
rko->rko_u.assign.partitions = NULL;
}
if (rkcg->rkcg_rebalance_incr_assignment) {
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_rebalance_incr_assignment);
rkcg->rkcg_rebalance_incr_assignment = NULL;
}

rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN;

if (rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
rd_kafka_cgrp_set_join_state(
rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
}

} else if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
!(rko->rko_u.assign.method ==
Expand Down
58 changes: 35 additions & 23 deletions tests/0113-cooperative_rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2200,8 +2200,14 @@ static void t_max_poll_interval_exceeded(int variation) {
Test::subscribe(c1, topic_name_1);
Test::subscribe(c2, topic_name_1);

bool done = false;
bool both_have_been_assigned = false;
bool done = false;
bool both_have_been_assigned = false;
int expected_cb1_assign_call_cnt = 1;
int expected_cb2_assign_call_cnt = 2;
int expected_cb1_revoke_call_cnt = 1;
int expected_cb2_revoke_call_cnt = 1;
int expected_cb1_lost_call_cnt = 1;

while (!done) {
if (!both_have_been_assigned)
Test::poll_once(c1, 500);
Expand Down Expand Up @@ -2233,37 +2239,43 @@ static void t_max_poll_interval_exceeded(int variation) {
500); /* Eat the max poll interval exceeded error message */
Test::poll_once(c1,
500); /* Trigger the rebalance_cb with lost partitions */
if (rebalance_cb1.lost_call_cnt != 1)
Test::Fail(
tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
<< rebalance_cb1.lost_call_cnt);
if (rebalance_cb1.lost_call_cnt != expected_cb1_lost_call_cnt)
Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be "
<< expected_cb1_lost_call_cnt
<< ", not: " << rebalance_cb1.lost_call_cnt);
}

if (variation == 3) {
/* Last poll will cause a rejoin, wait that the rejoin happens. */
rd_sleep(5);
expected_cb2_revoke_call_cnt++;
}

c1->close();
c2->close();

if (rebalance_cb1.lost_call_cnt != 1)
Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be 1, not: "
<< rebalance_cb1.lost_call_cnt);

if (rebalance_cb1.assign_call_cnt != 1)
Test::Fail(tostr() << "Expected consumer 1 assign count to be 1, not: "
<< rebalance_cb1.assign_call_cnt);
if (rebalance_cb2.assign_call_cnt != 2)
Test::Fail(tostr() << "Expected consumer 1 assign count to be 2, not: "
<< rebalance_cb1.assign_call_cnt);

if (rebalance_cb1.revoke_call_cnt != 1)
Test::Fail(tostr() << "Expected consumer 1 revoke count to be 1, not: "
<< rebalance_cb1.revoke_call_cnt);
if (rebalance_cb2.revoke_call_cnt != 1)
Test::Fail(tostr() << "Expected consumer 2 revoke count to be 1, not: "
<< rebalance_cb1.revoke_call_cnt);
if (rebalance_cb1.lost_call_cnt != expected_cb1_lost_call_cnt)
Test::Fail(tostr() << "Expected consumer 1 lost revoke count to be "
<< expected_cb1_lost_call_cnt
<< ", not: " << rebalance_cb1.lost_call_cnt);

if (rebalance_cb1.nonempty_assign_call_cnt != expected_cb1_assign_call_cnt)
Test::Fail(tostr() << "Expected consumer 1 non-empty assign count to be "
<< expected_cb1_assign_call_cnt
<< ", not: " << rebalance_cb1.nonempty_assign_call_cnt);
if (rebalance_cb2.nonempty_assign_call_cnt != expected_cb2_assign_call_cnt)
Test::Fail(tostr() << "Expected consumer 2 non-empty assign count to be "
<< expected_cb2_assign_call_cnt
<< ", not: " << rebalance_cb2.nonempty_assign_call_cnt);

if (rebalance_cb1.revoke_call_cnt != expected_cb1_revoke_call_cnt)
Test::Fail(tostr() << "Expected consumer 1 revoke count to be "
<< expected_cb1_revoke_call_cnt
<< ", not: " << rebalance_cb1.revoke_call_cnt);
if (rebalance_cb2.revoke_call_cnt != expected_cb2_revoke_call_cnt)
Test::Fail(tostr() << "Expected consumer 2 revoke count to be "
<< expected_cb2_revoke_call_cnt
<< ", not: " << rebalance_cb2.revoke_call_cnt);

delete c1;
delete c2;
Expand Down

0 comments on commit 704c596

Please sign in to comment.