From 3cc78f95f7f692a6f4a05aabb1573fb6ce723455 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Wed, 22 Jan 2020 16:57:36 +0100 Subject: [PATCH] Enforce session.timeout.ms in the consumer itself (#2631) If no successful Heartbeat has been sent in session.timeout.ms the consumer will trigger a local rebalance (rebalance callback with error code set to REVOKE_PARTITIONS). The consumer will rejoin the group when the rebalance has been handled. --- src/rdkafka_cgrp.c | 295 +++++++++++++++++++-------------- src/rdkafka_cgrp.h | 14 +- tests/0106-cgrp_sess_timeout.c | 229 +++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 2 + win32/tests/tests.vcxproj | 1 + 6 files changed, 417 insertions(+), 125 deletions(-) create mode 100644 tests/0106-cgrp_sess_timeout.c diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index cbe29667da..5cfc8cb289 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -190,6 +190,22 @@ void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) { +/** + * @brief Update the absolute session timeout following a successfull + * response from the coordinator. + * This timeout is used to enforce the session timeout in the + * consumer itself. + * + * @param reset if true the timeout is updated even if the session has expired. + */ +static RD_INLINE void +rd_kafka_cgrp_update_session_timeout (rd_kafka_cgrp_t *rkcg, rd_bool_t reset) { + if (reset || rkcg->rkcg_ts_session_timeout != 0) + rkcg->rkcg_ts_session_timeout = rd_clock() + + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms*1000); +} + + rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk, const rd_kafkap_str_t *group_id, @@ -525,8 +541,8 @@ void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg, rd_rkb_dbg(rkb, CGRP, "CGRPQUERY", "Group \"%.*s\": " "unable to send coordinator query: %s", - RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), - rd_kafka_err2str(err)); + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_err2str(err)); rd_kafka_broker_destroy(rkb); return; } @@ -535,6 +551,9 @@ void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg, rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD); rd_kafka_broker_destroy(rkb); + + /* Back off the next intervalled query since we just sent one. */ + rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0); } /** @@ -1416,49 +1435,120 @@ void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; - int actions; + int actions = 0; + const char *rebalance_reason = NULL; - if (err) { - if (err == RD_KAFKA_RESP_ERR__DESTROY) - return; /* Terminating */ - ErrorCode = err; + rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT); + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + + rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (err) goto err; - } if (request->rkbuf_reqhdr.ApiVersion >= 1) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + if (ErrorCode) { + err = ErrorCode; + goto err; + } -err: - actions = rd_kafka_err_action(rkb, ErrorCode, request, - RD_KAFKA_ERR_ACTION_END); + rd_kafka_cgrp_update_session_timeout( + rkcg, rd_false/*dont update if session has expired*/); - rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT); - rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + return; - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Re-query for coordinator */ - rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, - RD_KAFKA_OP_COORD_QUERY, ErrorCode); - } + err_parse: + err = rkbuf->rkbuf_err; + err: + rkcg->rkcg_last_heartbeat_err = err; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Group \"%s\" heartbeat error response in " + "state %s (join state %s, %d partition(s) assigned): %s", + rkcg->rkcg_group_id->str, + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0, + rd_kafka_err2str(err)); + + if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Heartbeat response: discarding outdated " + "request (now in join-state %s)", + rd_kafka_cgrp_join_state_names[rkcg-> + rkcg_join_state]); + return; + } - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - if (rd_kafka_buf_retry(rkb, request)) { - rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + switch (err) + { + case RD_KAFKA_RESP_ERR__DESTROY: + /* quick cleanup */ + return; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: + case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR__TRANSPORT: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord ? + rd_kafka_broker_name(rkcg->rkcg_curr_coord) : + "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: + /* No further action if already rebalancing */ + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB) return; - } - /* FALLTHRU */ + rebalance_reason = "group is rebalancing"; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + rd_kafka_cgrp_set_member_id(rkcg, ""); + rebalance_reason = "resetting member-id"; + break; + + case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: + rebalance_reason = "group is rebalancing"; + break; + + case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID: + rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, + "Fatal consumer error: %s", + rd_kafka_err2str(err)); + rebalance_reason = "consumer fenced by newer instance"; + break; + + default: + actions = rd_kafka_err_action(rkb, err, request, + RD_KAFKA_ERR_ACTION_END); + break; } - if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) - rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode); - return; + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + } - err_parse: - ErrorCode = rkbuf->rkbuf_err; - goto err; + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + return; + } + + if (rebalance_reason) + rd_kafka_cgrp_rebalance(rkcg, rebalance_reason); } @@ -2537,94 +2627,6 @@ rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg, } -/** - * Handle HeartbeatResponse errors. - * - * If an IllegalGeneration error code is returned in the - * HeartbeatResponse, it indicates that the co-ordinator has - * initiated a rebalance. The consumer then stops fetching data, - * commits offsets and sends a JoinGroupRequest to it's co-ordinator - * broker */ -void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg, - rd_kafka_resp_err_t err) { - const char *reason = NULL; - - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Group \"%s\" heartbeat error response in " - "state %s (join state %s, %d partition(s) assigned): %s", - rkcg->rkcg_group_id->str, - rd_kafka_cgrp_state_names[rkcg->rkcg_state], - rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], - rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0, - rd_kafka_err2str(err)); - - if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", - "Heartbeat response: discarding outdated " - "request (now in join-state %s)", - rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); - return; - } - - switch (err) - { - case RD_KAFKA_RESP_ERR__DESTROY: - /* quick cleanup */ - return; - - case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: - case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: - case RD_KAFKA_RESP_ERR__TRANSPORT: - rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed due to coordinator (%s) " - "no longer available: %s: " - "re-querying for coordinator", - rkcg->rkcg_curr_coord ? - rd_kafka_broker_name(rkcg->rkcg_curr_coord) : - "none", - rd_kafka_err2str(err)); - /* Remain in joined state and keep querying for coordinator */ - rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0); - return; - - case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: - /* No further action if already rebalancing */ - if (rkcg->rkcg_join_state == - RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB) - return; - reason = "group is rebalancing"; - break; - - case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: - rd_kafka_cgrp_set_member_id(rkcg, ""); - reason = "resetting member-id"; - break; - - case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: - reason = "group is rebalancing"; - break; - - case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID: - rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, - "Fatal consumer error: %s", - rd_kafka_err2str(err)); - reason = "consumer fenced by newer instance"; - break; - - default: - reason = rd_kafka_err2str(err); - break; - } - - rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed: %s: %s", - rd_kafka_err2name(err), reason); - - rd_kafka_cgrp_rebalance(rkcg, reason); -} - - - /** * Clean up any group-leader related resources. * @@ -3185,10 +3187,56 @@ rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq, } +/** + * @returns true if the session timeout has expired (due to no successful + * Heartbeats in session.timeout.ms) and triggers a rebalance. + */ +static rd_bool_t +rd_kafka_cgrp_session_timeout_check (rd_kafka_cgrp_t *rkcg, rd_ts_t now) { + rd_ts_t delta; + char buf[256]; + + if (unlikely(!rkcg->rkcg_ts_session_timeout)) + return rd_true; /* Session has expired */ + + delta = now - rkcg->rkcg_ts_session_timeout; + if (likely(delta < 0)) + return rd_false; + + delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000; + + rd_snprintf(buf, sizeof(buf), + "Consumer group session timed out (in join-state %s) after " + "%"PRId64" ms without a successful response from the " + "group coordinator (broker %"PRId32", last error was %s)", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + delta/1000, rkcg->rkcg_coord_id, + rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err)); + + rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT", + "%s: revoking assignment and rejoining group", buf); + + /* Prevent further rebalances */ + rkcg->rkcg_ts_session_timeout = 0; + + /* Timing out invalidates the member id, reset it + * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ + rd_kafka_cgrp_set_member_id(rkcg, ""); + + /* Revoke and rebalance */ + rd_kafka_cgrp_rebalance(rkcg, buf); + + return rd_true; +} + + /** * Client group's join state handling */ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) { + rd_ts_t now = rd_clock(); if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk))) return; @@ -3201,7 +3249,7 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) { break; if (rd_interval_immediate(&rkcg->rkcg_join_intvl, - 1000*1000, 0) > 0) + 1000*1000, now) > 0) rd_kafka_cgrp_join(rkcg); break; @@ -3211,14 +3259,17 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) { case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN: break; - case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB: - case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB: case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED: case RD_KAFKA_CGRP_JOIN_STATE_STARTED: + if (rd_kafka_cgrp_session_timeout_check(rkcg, now)) + return; + /* FALLTHRU */ + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB: + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB: if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && rd_interval(&rkcg->rkcg_heartbeat_intvl, rkcg->rkcg_rk->rk_conf. - group_heartbeat_intvl_ms * 1000, 0) > 0) + group_heartbeat_intvl_ms * 1000, now) > 0) rd_kafka_cgrp_heartbeat(rkcg); break; } @@ -3514,6 +3565,8 @@ void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg, rd_kafka_buf_read_bytes(rkbuf, &UserData); done: + rd_kafka_cgrp_update_session_timeout(rkcg, rd_true/*reset timeout*/); + /* Set the new assignment */ rd_kafka_cgrp_handle_assignment(rkcg, assignment); diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 785ae50021..579d341c3c 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -166,6 +166,15 @@ typedef struct rd_kafka_cgrp_s { rd_interval_t rkcg_join_intvl; /* JoinGroup interval */ rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */ + rd_ts_t rkcg_ts_session_timeout; /**< Absolute session + * timeout enforced by + * the consumer, this + * value is updated on + * Heartbeat success, + * etc. */ + rd_kafka_resp_err_t rkcg_last_heartbeat_err; /**< Last Heartbeat error, + * used for logging. */ + TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics;/* Topics subscribed to */ rd_list_t rkcg_toppars; /* Toppars subscribed to*/ @@ -234,7 +243,7 @@ typedef struct rd_kafka_cgrp_s { * last rebalance */ int rebalance_cnt; /* Number of rebalances */ - char rebalance_reason[128]; /**< Last rebalance + char rebalance_reason[256]; /**< Last rebalance * reason */ int assignment_size; /* Partition count * of last rebalance @@ -285,9 +294,6 @@ int rd_kafka_cgrp_topic_check (rd_kafka_cgrp_t *rkcg, const char *topic); void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id); -void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg, - rd_kafka_resp_err_t err); - void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, diff --git a/tests/0106-cgrp_sess_timeout.c b/tests/0106-cgrp_sess_timeout.c new file mode 100644 index 0000000000..d6aa8784f8 --- /dev/null +++ b/tests/0106-cgrp_sess_timeout.c @@ -0,0 +1,229 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +#include "../src/rdkafka_proto.h" + + +/** + * @name Verify that the high-level consumer times out itself if + * heartbeats are not successful (issue #2631). + */ + +static const char *commit_type; +static int rebalance_cnt; +static rd_kafka_resp_err_t rebalance_exp_event; +static rd_kafka_resp_err_t commit_exp_err; + +static void rebalance_cb (rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *parts, + void *opaque) { + + rebalance_cnt++; + TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", + rebalance_cnt, rd_kafka_err2name(err), parts->cnt); + + TEST_ASSERT(err == rebalance_exp_event, + "Expected rebalance event %s, not %s", + rd_kafka_err2name(rebalance_exp_event), + rd_kafka_err2name(err)); + + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { + test_consumer_assign("assign", rk, parts); + } else { + rd_kafka_resp_err_t commit_err; + + if (strcmp(commit_type, "auto")) { + rd_kafka_resp_err_t perr; + + TEST_SAY("Performing %s commit\n", commit_type); + + perr = rd_kafka_position(rk, parts); + TEST_ASSERT(!perr, "Failed to acquire position: %s", + rd_kafka_err2str(perr)); + + /* Sleep a short while so the broker times out the + * member too. */ + rd_sleep(1); + + commit_err = rd_kafka_commit( + rk, parts, !strcmp(commit_type, "async")); + + if (!strcmp(commit_type, "async")) + TEST_ASSERT(!commit_err, + "Async commit should not fail, " + "but it returned %s", + rd_kafka_err2name(commit_err)); + else + TEST_ASSERT(commit_err == commit_exp_err || + (!commit_exp_err && + commit_err == + RD_KAFKA_RESP_ERR__NO_OFFSET), + "Expected %s commit to return %s, " + "not %s", + commit_type, + rd_kafka_err2name(commit_exp_err), + rd_kafka_err2name(commit_err)); + } + + test_consumer_unassign("unassign", rk); + } +} + + +/** + * @brief Wait for an expected rebalance event, or fail. + */ +static void expect_rebalance (const char *what, rd_kafka_t *c, + rd_kafka_resp_err_t exp_event, + int timeout_s) { + int64_t tmout = test_clock() + (timeout_s * 1000000); + int start_cnt = rebalance_cnt; + + TEST_SAY("Waiting for %s (%s) for %ds\n", + what, rd_kafka_err2name(exp_event), timeout_s); + + rebalance_exp_event = exp_event; + + while (tmout > test_clock() && rebalance_cnt == start_cnt) { + if (test_consumer_poll_once(c, NULL, 1000)) + rd_sleep(1); + } + + if (rebalance_cnt == start_cnt + 1) { + rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR; + return; + } + + TEST_FAIL("Timed out waiting for %s (%s)\n", + what, rd_kafka_err2name(exp_event)); +} + + +/** + * @brief Verify that session timeouts are handled by the consumer itself. + * + * @param use_commit_type "auto", "sync" (manual), "async" (manual) + */ +static void do_test_session_timeout (const char *use_commit_type) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *groupid = "mygroup"; + const char *topic = "test"; + + rebalance_cnt = 0; + commit_type = use_commit_type; + + TEST_SAY(_C_MAG "[ Test session timeout with %s commit ]\n", + commit_type); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", + NULL); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "security.protocol", "PLAINTEXT"); + test_conf_set(conf, "group.id", groupid); + test_conf_set(conf, "session.timeout.ms", "5000"); + test_conf_set(conf, "heartbeat.interval.ms", "1000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", + !strcmp(commit_type, "auto") ? "true" : "false"); + + c = test_create_consumer(groupid, rebalance_cb, conf, NULL); + + test_consumer_subscribe(c, topic); + + /* Let Heartbeats fail after a couple of successful ones */ + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Heartbeat, + 9, + RD_KAFKA_RESP_ERR_NO_ERROR, + RD_KAFKA_RESP_ERR_NO_ERROR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR); + + expect_rebalance("initial assignment", c, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2); + + /* Consume a couple of messages so that we have something to commit */ + test_consumer_poll("consume", c, 0, -1, 0, 10, NULL); + + /* The commit in the rebalance callback should fail when the + * member has timed out from the group. */ + commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + + expect_rebalance("session timeout revoke", c, + RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2+5+2); + + expect_rebalance("second assignment", c, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5+2); + + /* Final rebalance in close(). + * It's commit will work. */ + rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + commit_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test session timeout with %s commit PASSED ]\n", + commit_type); +} + + +int main_0106_cgrp_sess_timeout (int argc, char **argv) { + + do_test_session_timeout("sync"); + + if (!test_quick) { + do_test_session_timeout("async"); + do_test_session_timeout("auto"); + } + + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4db1b790f1..5c2dc1e9f6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -97,6 +97,7 @@ set( 0103-transactions.c 0104-fetch_from_follower_mock.c 0105-transactions_mock.c + 0106-cgrp_sess_timeout.c 8000-idle.cpp test.c testcpp.cpp diff --git a/tests/test.c b/tests/test.c index 9a670e58de..bbcc1494bf 100644 --- a/tests/test.c +++ b/tests/test.c @@ -209,6 +209,7 @@ _TEST_DECL(0103_transactions_local); _TEST_DECL(0103_transactions); _TEST_DECL(0104_fetch_from_follower_mock); _TEST_DECL(0105_transactions_mock); +_TEST_DECL(0106_cgrp_sess_timeout); /* Manual tests */ _TEST_DECL(8000_idle); @@ -386,6 +387,7 @@ struct test tests[] = { _TEST(0104_fetch_from_follower_mock, TEST_F_LOCAL, TEST_BRKVER(2,4,0,0)), _TEST(0105_transactions_mock, TEST_F_LOCAL), + _TEST(0106_cgrp_sess_timeout, TEST_F_LOCAL), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index 953ab28b68..b1563747b0 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -187,6 +187,7 @@ +