Skip to content

Commit

Permalink
[KIP-848] Logging improvements (#4692)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored and anchitj committed Jun 10, 2024
1 parent 4c7c6ad commit 3536f56
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 26 deletions.
7 changes: 5 additions & 2 deletions src/rdkafka_assignment.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk,
case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
rk->rk_cgrp->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
/* Fallback */
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp,
"OffsetFetch error: Stale member epoch");
break;
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
rk->rk_cgrp, "OffsetFetch error: Unknown member");
break;
default:
rd_kafka_dbg(
Expand Down
93 changes: 71 additions & 22 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {

rd_kafka_cgrp_consumer_reset(rkcg);
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg, "rejoining");
}


Expand Down Expand Up @@ -2641,8 +2641,14 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment(
rd_kafka_topic_partition_list_t *new_target_assignment,
rd_bool_t clear_next_assignment) {
rd_bool_t is_assignment_different = rd_false;
if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK)
rd_bool_t has_next_target_assignment_to_clear =
rkcg->rkcg_next_target_assignment && clear_next_assignment;
if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Reconciliation in progress, "
"postponing next one");
return RD_KAFKA_OP_RES_HANDLED;
}

is_assignment_different =
rd_kafka_cgrp_consumer_is_new_assignment_different(
Expand All @@ -2652,12 +2658,20 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment(
* INIT or state STEADY, keeps it as next target assignment
* otherwise. */
if (!is_assignment_different) {
if (rkcg->rkcg_next_target_assignment &&
clear_next_assignment) {
if (has_next_target_assignment_to_clear) {
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_next_target_assignment);
rkcg->rkcg_next_target_assignment = NULL;
}

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Not reconciling new assignment: "
"Assignment is the same. "
"Next assignment %s",
(has_next_target_assignment_to_clear
? "cleared"
: "not cleared"));

} else if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ||
rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK;
Expand All @@ -2668,8 +2682,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment(
rkcg->rkcg_target_assignment =
rd_kafka_topic_partition_list_copy(new_target_assignment);

if (rkcg->rkcg_next_target_assignment &&
clear_next_assignment) {
if (has_next_target_assignment_to_clear) {
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_next_target_assignment);
rkcg->rkcg_next_target_assignment = NULL;
Expand All @@ -2685,8 +2698,12 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment(

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Reconciliation starts with new target "
"assignment \"%s\"",
rkcg_target_assignment_str);
"assignment \"%s\". "
"Next assignment %s",
rkcg_target_assignment_str,
(has_next_target_assignment_to_clear
? "cleared"
: "not cleared"));
}
rd_kafka_cgrp_handle_assignment(rkcg,
rkcg->rkcg_target_assignment);
Expand Down Expand Up @@ -2813,8 +2830,8 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata(
rd_list_t *missing_topic_ids = NULL;

if (!rkcg->rkcg_next_target_assignment->cnt) {
/* No metadata to request, continue with handle_next_assignment.
*/
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"No metadata to request, continuing");
rd_kafka_topic_partition_list_t *new_target_assignment =
rd_kafka_topic_partition_list_new(0);
rd_kafka_cgrp_consumer_handle_next_assignment(
Expand Down Expand Up @@ -2974,7 +2991,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
/* We've finished reconciliation but we weren't
* sending an ack, need to send a new HB with the ack.
*/
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "not subscribed anymore");
}
}

Expand Down Expand Up @@ -3102,7 +3120,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "coordinator query");
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
Expand Down Expand Up @@ -3794,7 +3813,7 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
if (rkcg->rkcg_group_protocol ==
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
rk->rk_cgrp, "OffsetCommit error: Unknown member");
} else {
/* Revoke assignment and rebalance on unknown member */
rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
Expand All @@ -3819,7 +3838,8 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,

case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH:
/* FIXME: Add logs.*/
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp, "OffsetCommit error: Stale member epoch");
if (!rd_strcmp(rko_orig->rko_u.offset_commit.reason, "manual"))
/* Don't retry manual commits giving this error.
* TODO: do this in a faster and cleaner way
Expand Down Expand Up @@ -4978,7 +4998,10 @@ rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts,
rd_kafka_cgrp_consumer_leave(rkcg);
rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN;
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg,
"max poll interval "
"exceeded");
} else {
/* Leave the group before calling rebalance since the standard
* leave will be triggered first after the rebalance callback
Expand Down Expand Up @@ -5843,6 +5866,21 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
~RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION) |
RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION;
rkcg_subscription = rkcg->rkcg_subscription;

if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) {
char rkcg_new_subscription_str[512] = "NULL";

if (rkcg_subscription) {
rd_kafka_topic_partition_list_str(
rkcg_subscription,
rkcg_new_subscription_str,
sizeof(rkcg_new_subscription_str), 0);
}

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Sending new subscription \"%s\"",
rkcg_new_subscription_str);
}
}

rkcg->rkcg_expedite_heartbeat_retries++;
Expand Down Expand Up @@ -5890,6 +5928,11 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) {
rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE;

rd_kafka_dbg(
rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Revoking assignment as lost an rejoining in join state %s",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);

rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_true, rd_true,
"member fenced - rejoining");
}
Expand Down Expand Up @@ -5990,7 +6033,8 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;

rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "subscription changed");
} else {
rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/);
}
Expand Down Expand Up @@ -6092,7 +6136,8 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
break;

case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "back to steady state");

if (rkcg->rkcg_rebalance_rejoin) {
rkcg->rkcg_rebalance_rejoin = rd_false;
Expand Down Expand Up @@ -6123,18 +6168,17 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) {
still_in_group &= !rd_kafka_cgrp_try_terminate(rkcg);

if (still_in_group)
rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "back to init state");
break;
}
default:
break;
}
}

/**
* FIXME: Add reason and logging.
*/
void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) {
void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg,
const char *reason) {
if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER)
return;

Expand Down Expand Up @@ -6162,6 +6206,11 @@ void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) {
/* Set the exponential backoff. */
rd_interval_backoff(&rkcg->rkcg_heartbeat_intvl, backoff);

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Expediting next heartbeat"
", with backoff %" PRId64 ": %s",
backoff, reason);

/* Scheduling the timer awakes main loop too. */
rd_kafka_timer_start_oneshot(&rkcg->rkcg_rk->rk_timers,
&rkcg->rkcg_serve_timer, rd_true, backoff,
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) {
}
}

void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg);
void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg,
const char *reason);

#endif /* _RDKAFKA_CGRP_H_ */
3 changes: 2 additions & 1 deletion src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,8 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) {
rk->rk_cgrp->rkcg_flags &
RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED)) {
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rk->rk_cgrp);
rk->rk_cgrp,
"app polled after poll interval exceeded");
}
}
}
Expand Down

0 comments on commit 3536f56

Please sign in to comment.