Skip to content

Commit

Permalink
Enforce session.timeout.ms in the consumer itself (#2631)
Browse files Browse the repository at this point in the history
If no successful Heartbeat has been sent in session.timeout.ms
the consumer will trigger a local rebalance (rebalance callback with
error code set to REVOKE_PARTITIONS).
The consumer will rejoin the group when the rebalance has been handled.
  • Loading branch information
edenhill committed Feb 5, 2020
1 parent 4ce0a46 commit 99ecf07
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 125 deletions.
295 changes: 174 additions & 121 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,22 @@ void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {



/**
* @brief Update the absolute session timeout following a successfull
* response from the coordinator.
* This timeout is used to enforce the session timeout in the
* consumer itself.
*
* @param reset if true the timeout is updated even if the session has expired.
*/
static RD_INLINE void
rd_kafka_cgrp_update_session_timeout (rd_kafka_cgrp_t *rkcg, rd_bool_t reset) {
if (reset || rkcg->rkcg_ts_session_timeout != 0)
rkcg->rkcg_ts_session_timeout = rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms*1000);
}



rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
const rd_kafkap_str_t *group_id,
Expand Down Expand Up @@ -525,8 +541,8 @@ void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
"Group \"%.*s\": "
"unable to send coordinator query: %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_err2str(err));
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_err2str(err));
rd_kafka_broker_destroy(rkb);
return;
}
Expand All @@ -535,6 +551,9 @@ void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);

rd_kafka_broker_destroy(rkb);

/* Back off the next intervalled query since we just sent one. */
rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
}

/**
Expand Down Expand Up @@ -1416,49 +1435,120 @@ void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int actions;
int actions = 0;
const char *rebalance_reason = NULL;

if (err) {
if (err == RD_KAFKA_RESP_ERR__DESTROY)
return; /* Terminating */
ErrorCode = err;
rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;

rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;

if (err)
goto err;
}

if (request->rkbuf_reqhdr.ApiVersion >= 1)
rd_kafka_buf_read_throttle_time(rkbuf);

rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
if (ErrorCode) {
err = ErrorCode;
goto err;
}

err:
actions = rd_kafka_err_action(rkb, ErrorCode, request,
RD_KAFKA_ERR_ACTION_END);
rd_kafka_cgrp_update_session_timeout(
rkcg, rd_false/*dont update if session has expired*/);

rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
return;

if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Re-query for coordinator */
rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
RD_KAFKA_OP_COORD_QUERY, ErrorCode);
}
err_parse:
err = rkbuf->rkbuf_err;
err:
rkcg->rkcg_last_heartbeat_err = err;

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Group \"%s\" heartbeat error response in "
"state %s (join state %s, %d partition(s) assigned): %s",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0,
rd_kafka_err2str(err));

if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Heartbeat response: discarding outdated "
"request (now in join-state %s)",
rd_kafka_cgrp_join_state_names[rkcg->
rkcg_join_state]);
return;
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
if (rd_kafka_buf_retry(rkb, request)) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
switch (err)
{
case RD_KAFKA_RESP_ERR__DESTROY:
/* quick cleanup */
return;

case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR__TRANSPORT:
rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
"Heartbeat failed due to coordinator (%s) "
"no longer available: %s: "
"re-querying for coordinator",
rkcg->rkcg_curr_coord ?
rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
"none",
rd_kafka_err2str(err));
/* Remain in joined state and keep querying for coordinator */
actions = RD_KAFKA_ERR_ACTION_REFRESH;
break;

case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
/* No further action if already rebalancing */
if (rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
return;
}
/* FALLTHRU */
rebalance_reason = "group is rebalancing";
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_set_member_id(rkcg, "");
rebalance_reason = "resetting member-id";
break;

case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
rebalance_reason = "group is rebalancing";
break;

case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID:
rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
"Fatal consumer error: %s",
rd_kafka_err2str(err));
rebalance_reason = "consumer fenced by newer instance";
break;

default:
actions = rd_kafka_err_action(rkb, err, request,
RD_KAFKA_ERR_ACTION_END);
break;
}

if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY)
rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode);

return;
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Re-query for coordinator */
rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
}

err_parse:
ErrorCode = rkbuf->rkbuf_err;
goto err;
if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
rd_kafka_buf_retry(rkb, request)) {
/* Retry */
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
return;
}

if (rebalance_reason)
rd_kafka_cgrp_rebalance(rkcg, rebalance_reason);
}


Expand Down Expand Up @@ -2537,94 +2627,6 @@ rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
}


/**
* Handle HeartbeatResponse errors.
*
* If an IllegalGeneration error code is returned in the
* HeartbeatResponse, it indicates that the co-ordinator has
* initiated a rebalance. The consumer then stops fetching data,
* commits offsets and sends a JoinGroupRequest to it's co-ordinator
* broker */
void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
rd_kafka_resp_err_t err) {
const char *reason = NULL;

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Group \"%s\" heartbeat error response in "
"state %s (join state %s, %d partition(s) assigned): %s",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0,
rd_kafka_err2str(err));

if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Heartbeat response: discarding outdated "
"request (now in join-state %s)",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
return;
}

switch (err)
{
case RD_KAFKA_RESP_ERR__DESTROY:
/* quick cleanup */
return;

case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR__TRANSPORT:
rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
"Heartbeat failed due to coordinator (%s) "
"no longer available: %s: "
"re-querying for coordinator",
rkcg->rkcg_curr_coord ?
rd_kafka_broker_name(rkcg->rkcg_curr_coord) :
"none",
rd_kafka_err2str(err));
/* Remain in joined state and keep querying for coordinator */
rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0);
return;

case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
/* No further action if already rebalancing */
if (rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
return;
reason = "group is rebalancing";
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_set_member_id(rkcg, "");
reason = "resetting member-id";
break;

case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
reason = "group is rebalancing";
break;

case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID:
rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
"Fatal consumer error: %s",
rd_kafka_err2str(err));
reason = "consumer fenced by newer instance";
break;

default:
reason = rd_kafka_err2str(err);
break;
}

rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
"Heartbeat failed: %s: %s",
rd_kafka_err2name(err), reason);

rd_kafka_cgrp_rebalance(rkcg, reason);
}



/**
* Clean up any group-leader related resources.
*
Expand Down Expand Up @@ -3185,10 +3187,56 @@ rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
}


/**
* @returns true if the session timeout has expired (due to no successful
* Heartbeats in session.timeout.ms) and triggers a rebalance.
*/
static rd_bool_t
rd_kafka_cgrp_session_timeout_check (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
rd_ts_t delta;
char buf[256];

if (unlikely(!rkcg->rkcg_ts_session_timeout))
return rd_true; /* Session has expired */

delta = now - rkcg->rkcg_ts_session_timeout;
if (likely(delta < 0))
return rd_false;

delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000;

rd_snprintf(buf, sizeof(buf),
"Consumer group session timed out (in join-state %s) after "
"%"PRId64" ms without a successful response from the "
"group coordinator (broker %"PRId32", last error was %s)",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
delta/1000, rkcg->rkcg_coord_id,
rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err));

rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;

rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT",
"%s: revoking assignment and rejoining group", buf);

/* Prevent further rebalances */
rkcg->rkcg_ts_session_timeout = 0;

/* Timing out invalidates the member id, reset it
* now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
rd_kafka_cgrp_set_member_id(rkcg, "");

/* Revoke and rebalance */
rd_kafka_cgrp_rebalance(rkcg, buf);

return rd_true;
}


/**
* Client group's join state handling
*/
static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
rd_ts_t now = rd_clock();

if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk)))
return;
Expand All @@ -3201,7 +3249,7 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
break;

if (rd_interval_immediate(&rkcg->rkcg_join_intvl,
1000*1000, 0) > 0)
1000*1000, now) > 0)
rd_kafka_cgrp_join(rkcg);
break;

Expand All @@ -3211,14 +3259,17 @@ static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg) {
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN:
break;

case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB:
case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED:
case RD_KAFKA_CGRP_JOIN_STATE_STARTED:
if (rd_kafka_cgrp_session_timeout_check(rkcg, now))
return;
/* FALLTHRU */
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB:
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
rd_interval(&rkcg->rkcg_heartbeat_intvl,
rkcg->rkcg_rk->rk_conf.
group_heartbeat_intvl_ms * 1000, 0) > 0)
group_heartbeat_intvl_ms * 1000, now) > 0)
rd_kafka_cgrp_heartbeat(rkcg);
break;
}
Expand Down Expand Up @@ -3514,6 +3565,8 @@ void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
rd_kafka_buf_read_bytes(rkbuf, &UserData);

done:
rd_kafka_cgrp_update_session_timeout(rkcg, rd_true/*reset timeout*/);

/* Set the new assignment */
rd_kafka_cgrp_handle_assignment(rkcg, assignment);

Expand Down
Loading

0 comments on commit 99ecf07

Please sign in to comment.