Skip to content

Commit

Permalink
Send Heartbeats when waiting for rebalance-revoke
Browse files Browse the repository at this point in the history
This is required for slow-processing applications where
max.poll.interval.ms > session.timeout.ms.
  • Loading branch information
edenhill committed Nov 20, 2018
1 parent b931045 commit 81d32d5
Showing 1 changed file with 26 additions and 16 deletions.
42 changes: 26 additions & 16 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2496,7 +2496,8 @@ void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
{
case RD_KAFKA_RESP_ERR__DESTROY:
/* quick cleanup */
break;
return;

case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR__TRANSPORT:
Expand All @@ -2509,26 +2510,35 @@ void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
rd_kafka_err2str(err));
/* Remain in joined state and keep querying for coordinator */
rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0);
break;
return;

case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
/* No further action if already rebalancing */
if (rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
return;
reason = "group is rebalancing";
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_set_member_id(rkcg, "");
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_set_member_id(rkcg, "");
reason = "resetting member-id";
case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
if (!reason)
reason = "group is rebalancing";
default:
if (!reason)
reason = rd_kafka_err2str(err);
break;

rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
"Heartbeat failed: %s: %s",
rd_kafka_err2name(err), reason);
case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
reason = "group is rebalancing";
break;

rd_kafka_cgrp_rebalance(rkcg, reason);
default:
reason = rd_kafka_err2str(err);
break;
}

rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
"Heartbeat failed: %s: %s",
rd_kafka_err2name(err), reason);

rd_kafka_cgrp_rebalance(rkcg, reason);
}


Expand Down Expand Up @@ -3109,9 +3119,9 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg,
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB:
break;

case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB:
case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED:
case RD_KAFKA_CGRP_JOIN_STATE_STARTED:
Expand Down

0 comments on commit 81d32d5

Please sign in to comment.