Skip to content

Commit

Permalink
[KIP-848] Added new error code handling to OffsetCommit and OffsetFet…
Browse files Browse the repository at this point in the history
…ch (#4681)

- Added new errors to manual commit.
- improvements to OffsetCommit and OffsetFetch error code handling.
  • Loading branch information
pranavrth authored Apr 17, 2024
1 parent d31ed86 commit 88b6839
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 31 deletions.
15 changes: 15 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -4426,6 +4426,21 @@ RD_EXPORT int rd_kafka_assignment_lost(rd_kafka_t *rk);
* or successfully scheduled if asynchronous, or failed.
* RD_KAFKA_RESP_ERR__FATAL is returned if the consumer has raised
* a fatal error.
*
* FIXME: Update below documentation.
*
* RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH is returned, when
* using `group.protocol=consumer`, if the commit failed because the
* member has switched to a new member epoch.
* This error code can be retried.
* Partition level error is also set in the \p offsets.
*
* RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID is returned, when
* using `group.protocol=consumer`, if the member has been
* removed from the consumer group
* This error code is permanent, uncommitted messages will be
* reprocessed by this or a different member and committed there.
* Partition level error is also set in the \p offsets.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_commit(rd_kafka_t *rk,
Expand Down
63 changes: 49 additions & 14 deletions src/rdkafka_assignment.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,30 @@ rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
continue;
}

if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
if (err == RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH ||
rktpar->err == RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH) {
rd_kafka_topic_partition_t *rktpar_copy;

rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
"Adding %s [%" PRId32
"] back to pending "
"list because of stale member epoch",
rktpar->topic, rktpar->partition);

rktpar_copy = rd_kafka_topic_partition_list_add_copy(
rk->rk_consumer.assignment.pending, rktpar);
/* Need to reset offset to STORED to query for
* the committed offset again. If the offset is
* kept INVALID then auto.offset.reset will be
* triggered.
*
* Not necessary if err is UNSTABLE_OFFSET_COMMIT
* because the buffer is retried there. */
rktpar_copy->offset = RD_KAFKA_OFFSET_STORED;

} else if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
rktpar->err ==
RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
/* Ongoing transactions are blocking offset retrieval.
* This is typically retried from the OffsetFetch
* handler but we can come here if the assignment
Expand Down Expand Up @@ -210,7 +232,9 @@ rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
/* Do nothing for request-level errors (err is set). */
}

if (offsets->cnt > 0)
/* In case of stale member epoch we retry to serve the
* assignment only after a successful ConsumerGroupHeartbeat. */
if (offsets->cnt > 0 && err != RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH)
rd_kafka_assignment_serve(rk);
}

Expand Down Expand Up @@ -274,18 +298,29 @@ static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk,
return;
}



if (err) {
rd_kafka_dbg(rk, CGRP, "OFFSET",
"Offset fetch error for %d partition(s): %s",
offsets->cnt, rd_kafka_err2str(err));
rd_kafka_consumer_err(
rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL,
NULL, RD_KAFKA_OFFSET_INVALID,
"Failed to fetch committed offsets for "
"%d partition(s) in group \"%s\": %s",
offsets->cnt, rk->rk_group_id->str, rd_kafka_err2str(err));
switch (err) {
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
rk->rk_cgrp->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
/* Fallback */
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
break;
default:
rd_kafka_dbg(
rk, CGRP, "OFFSET",
"Offset fetch error for %d partition(s): %s",
offsets->cnt, rd_kafka_err2str(err));
rd_kafka_consumer_err(
rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0,
NULL, NULL, RD_KAFKA_OFFSET_INVALID,
"Failed to fetch committed offsets for "
"%d partition(s) in group \"%s\": %s",
offsets->cnt, rk->rk_group_id->str,
rd_kafka_err2str(err));
}
}

/* Apply the fetched offsets to the assignment */
Expand Down
99 changes: 89 additions & 10 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2979,6 +2979,16 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
}
}

if (rkcg->rkcg_consumer_flags &
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING &&
rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
/* TODO: Check if this should be done only for the steady state?
*/
rd_kafka_assignment_serve(rk);
rkcg->rkcg_consumer_flags &=
~RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
}

if (rkcg->rkcg_next_target_assignment) {
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) {
rd_kafka_cgrp_consumer_next_target_assignment_request_metadata(
Expand Down Expand Up @@ -3092,8 +3102,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
/* Re-query for coordinator */
rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
Expand Down Expand Up @@ -3334,7 +3344,11 @@ static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg) {
if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
return 0;

/* Check if wait-coord queue has timed out. */
/* Check if wait-coord queue has timed out.
FIXME: Remove usage of `group_session_timeout_ms` for the new
consumer group protocol implementation defined in KIP-848.
*/
if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
rkcg->rkcg_ts_terminate +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
Expand Down Expand Up @@ -3505,7 +3519,6 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
const char *reason) {

/* wait_coord_q is disabled session.timeout.ms after
* group close() has been initated. */
if (rko->rko_u.offset_commit.ts_timeout != 0 ||
Expand All @@ -3524,6 +3537,11 @@ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
: "none");

rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;

/* FIXME: Remove `group_session_timeout_ms` for the new protocol
* defined in KIP-848 as this property is deprecated from client
* side in the new protocol.
*/
rko->rko_u.offset_commit.ts_timeout =
rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
Expand All @@ -3532,6 +3550,45 @@ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
return 1;
}

/**
* @brief Defer offset commit (rko) until coordinator is available (KIP-848).
*
* @returns 1 if the rko was deferred or 0 if the defer queue is disabled
* or rko already deferred.
*/
static int rd_kafka_cgrp_consumer_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
const char *reason) {
/* wait_coord_q is disabled session.timeout.ms after
* group close() has been initated. */
if ((rko->rko_u.offset_commit.ts_timeout != 0 &&
rd_clock() >= rko->rko_u.offset_commit.ts_timeout) ||
!rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
return 0;

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
"Group \"%s\": "
"unable to OffsetCommit in state %s: %s: "
"retrying later",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason);

rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;

if (!rko->rko_u.offset_commit.ts_timeout) {
rko->rko_u.offset_commit.ts_timeout =
rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
}

/* Reset partition level error before retrying */
rd_kafka_topic_partition_list_set_err(
rko->rko_u.offset_commit.partitions, RD_KAFKA_RESP_ERR_NO_ERROR);

rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);

return 1;
}

/**
* @brief Update the committed offsets for the partitions in \p offsets,
Expand Down Expand Up @@ -3730,18 +3787,23 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
rd_kafka_err2str(err));
}


/*
* Error handling
*/
switch (err) {
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
/* Revoke assignment and rebalance on unknown member */
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
rd_kafka_cgrp_revoke_all_rejoin_maybe(
rkcg, rd_true /*assignment is lost*/,
rd_true /*this consumer is initiating*/,
"OffsetCommit error: Unknown member");
if (rkcg->rkcg_group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
} else {
/* Revoke assignment and rebalance on unknown member */
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
rd_kafka_cgrp_revoke_all_rejoin_maybe(
rkcg, rd_true /*assignment is lost*/,
rd_true /*this consumer is initiating*/,
"OffsetCommit error: Unknown member");
}
break;

case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
Expand All @@ -3756,6 +3818,20 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
case RD_KAFKA_RESP_ERR__IN_PROGRESS:
return; /* Retrying */

case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
/* FIXME: Add logs.*/
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp);
if (!rd_strcmp(rko_orig->rko_u.offset_commit.reason, "manual"))
/* Don't retry manual commits giving this error.
* TODO: do this in a faster and cleaner way
* with a bool. */
break;

if (rd_kafka_cgrp_consumer_defer_offset_commit(
rkcg, rko_orig, rd_kafka_err2str(err)))
return;
break;

case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR__TRANSPORT:
Expand Down Expand Up @@ -6056,6 +6132,9 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
}
}

/**
* FIXME: Add reason and logging.
*/
void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) {
if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER)
return;
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ typedef struct rd_kafka_cgrp_s {
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN 0x40
/** Member is fenced, rejoining */
#define RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE 0x80

/** Serve pending assignments after heartbeat */
#define RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING 0x100

/** Rejoin the group following a currently in-progress
* incremental unassign. */
Expand Down
2 changes: 0 additions & 2 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,6 @@ rd_kafka_commit0(rd_kafka_t *rk,
return RD_KAFKA_RESP_ERR_NO_ERROR;
}



/**
* NOTE: 'offsets' may be NULL, see official documentation.
*/
Expand Down
7 changes: 6 additions & 1 deletion src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -2978,8 +2978,12 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(

/**
* @brief Creates a copy of \p rktpar and adds it to \p rktparlist
*
* @return Copy of passed partition that was added to the list
*
* @remark Ownership of returned partition remains of the list.
*/
void rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_list_t *rktparlist,
const rd_kafka_topic_partition_t *rktpar) {
rd_kafka_topic_partition_t *dst;
Expand All @@ -2988,6 +2992,7 @@ void rd_kafka_topic_partition_list_add_copy(
__FUNCTION__, __LINE__, rktparlist, rktpar->topic,
rktpar->partition, NULL, rktpar->_private);
rd_kafka_topic_partition_update(dst, rktpar);
return dst;
}


Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(
const char *topic,
int32_t partition);

void rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add_copy(
rd_kafka_topic_partition_list_t *rktparlist,
const rd_kafka_topic_partition_t *rktpar);

Expand Down
16 changes: 14 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2301,8 +2301,20 @@ void rd_kafka_ConsumerGroupHeartbeatRequest(

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

rd_kafka_buf_set_abs_timeout(
rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
/* FIXME:
* 1) Improve this timeout to something less than
* `rkcg_heartbeat_intvl_ms` so that the next heartbeat
* is not skipped.
* 2) Remove usage of `group_session_timeout_ms` altogether
* from the new protocol defined in KIP-848.
*/
if (rkb->rkb_rk->rk_cgrp->rkcg_heartbeat_intvl_ms > 0) {
rd_kafka_buf_set_abs_timeout(
rkbuf, rkb->rkb_rk->rk_cgrp->rkcg_heartbeat_intvl_ms, 0);
} else {
rd_kafka_buf_set_abs_timeout(
rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
}

rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}
Expand Down

0 comments on commit 88b6839

Please sign in to comment.