From 81d32d583987fced13cdbc66ab2c263d24b3e618 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Tue, 20 Nov 2018 09:54:06 +0100 Subject: [PATCH] Send Heartbeats when waiting for rebalance-revoke This is required for slow-processing applications where max.poll.interval.ms > session.timeout.ms. --- src/rdkafka_cgrp.c | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index c524a020a7..8ef7ab746a 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2496,7 +2496,8 @@ void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg, { case RD_KAFKA_RESP_ERR__DESTROY: /* quick cleanup */ - break; + return; + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR__TRANSPORT: @@ -2509,26 +2510,35 @@ void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg, rd_kafka_err2str(err)); /* Remain in joined state and keep querying for coordinator */ rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0); - break; + return; + + case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: + /* No further action if already rebalancing */ + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB) + return; + reason = "group is rebalancing"; + break; - case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: - rd_kafka_cgrp_set_member_id(rkcg, ""); + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + rd_kafka_cgrp_set_member_id(rkcg, ""); reason = "resetting member-id"; - case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: - case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: - if (!reason) - reason = "group is rebalancing"; - default: - if (!reason) - reason = rd_kafka_err2str(err); + break; - rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", - "Heartbeat failed: %s: %s", - rd_kafka_err2name(err), reason); + case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: + reason = "group is rebalancing"; + break; - rd_kafka_cgrp_rebalance(rkcg, reason); + default: + reason = rd_kafka_err2str(err); break; } + + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed: %s: %s", + rd_kafka_err2name(err), reason); + + rd_kafka_cgrp_rebalance(rkcg, reason); } @@ -3109,9 +3119,9 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg, case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN: - case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB: break; + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB: case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED: case RD_KAFKA_CGRP_JOIN_STATE_STARTED: