Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segfault if assignor state is NULL, #4381

Merged
merged 9 commits into from
Sep 26, 2023
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ librdkafka v2.2.1 is a maintenance release:
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Add missing destroy that leads to leaking partition structure memory when there
are partition leader changes and a stale leader epoch is received (#4429).
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).



Expand Down
10 changes: 7 additions & 3 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) {
rd_list_destroy(&rkcg->rkcg_toppars);
rd_list_destroy(rkcg->rkcg_subscribed_topics);
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
rd_free(rkcg);
Expand Down Expand Up @@ -1914,7 +1915,9 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
"Unsupported assignment strategy \"%s\"",
protocol_name);
if (rkcg->rkcg_assignor) {
if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor
->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor
->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
Expand Down Expand Up @@ -1952,7 +1955,8 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
}

if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) {
if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
rkcg->rkcg_assignor_state = NULL;
Expand Down
52 changes: 52 additions & 0 deletions tests/0113-cooperative_rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2914,6 +2914,57 @@ static void r_lost_partitions_commit_illegal_generation_test_local() {
test_mock_cluster_destroy(mcluster);
}

/**
* @brief Test that the consumer is destroyed without segfault if
* it happens before first rebalance and there is no assignor
* state. See #4312
*/
static void s_no_segfault_before_first_rebalance(void) {
rd_kafka_t *c;
rd_kafka_conf_t *conf;
rd_kafka_mock_cluster_t *mcluster;
const char *topic;
const char *bootstraps;

SUB_TEST_QUICK();

TEST_SAY("Creating mock cluster\n");
mcluster = test_mock_cluster_new(1, &bootstraps);

topic = test_mk_topic_name("0113_s", 1);

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");

TEST_SAY("Creating topic %s\n", topic);
TEST_CALL_ERR__(rd_kafka_mock_topic_create(
mcluster, topic, 2 /* partition_cnt */, 1 /* replication_factor */));

c = test_create_consumer(topic, NULL, conf, NULL);

/* Add a 1s delay to the SyncGroup response so next condition can happen. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1 /*Broker 1*/, RD_KAFKAP_SyncGroup /*FetchRequest*/, 1,
RD_KAFKA_RESP_ERR_NOT_COORDINATOR, 1000);

test_consumer_subscribe(c, topic);

/* Wait for initial rebalance 3000 ms (default) + 500 ms for processing
* the JoinGroup response. Consumer close must come between the JoinGroup
* response and the SyncGroup response, so that rkcg_assignor is set,
* but rkcg_assignor_state isn't. */
TEST_ASSERT(!test_consumer_poll_once(c, NULL, 3500), "poll should timeout");

rd_kafka_consumer_close(c);

rd_kafka_destroy(c);

TEST_SAY("Destroying mock cluster\n");
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

/**
* @brief Rebalance callback for the v_.. test below.
Expand Down Expand Up @@ -3117,6 +3168,7 @@ int main_0113_cooperative_rebalance_local(int argc, char **argv) {
q_lost_partitions_illegal_generation_test(rd_false /*joingroup*/);
q_lost_partitions_illegal_generation_test(rd_true /*syncgroup*/);
r_lost_partitions_commit_illegal_generation_test_local();
s_no_segfault_before_first_rebalance();
return 0;
}

Expand Down