From 3536f5625c11761e50fa3411325b9dd18f4a7cd7 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 18 Apr 2024 10:48:42 +0200 Subject: [PATCH] [KIP-848] Logging improvements (#4692) --- src/rdkafka_assignment.c | 7 ++- src/rdkafka_cgrp.c | 93 ++++++++++++++++++++++++++++++---------- src/rdkafka_cgrp.h | 3 +- src/rdkafka_int.h | 3 +- 4 files changed, 80 insertions(+), 26 deletions(-) diff --git a/src/rdkafka_assignment.c b/src/rdkafka_assignment.c index 94ddbd0969..6d1f01913f 100644 --- a/src/rdkafka_assignment.c +++ b/src/rdkafka_assignment.c @@ -303,10 +303,13 @@ static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: rk->rk_cgrp->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING; - /* Fallback */ + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rk->rk_cgrp, + "OffsetFetch error: Stale member epoch"); + break; case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: rd_kafka_cgrp_consumer_expedite_next_heartbeat( - rk->rk_cgrp); + rk->rk_cgrp, "OffsetFetch error: Unknown member"); break; default: rd_kafka_dbg( diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 108ee0b187..f10be22f29 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1369,7 +1369,7 @@ static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) { rd_kafka_cgrp_consumer_reset(rkcg); rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT); - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg, "rejoining"); } @@ -2641,8 +2641,14 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rd_kafka_topic_partition_list_t *new_target_assignment, rd_bool_t clear_next_assignment) { rd_bool_t is_assignment_different = rd_false; - if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) + rd_bool_t has_next_target_assignment_to_clear = + rkcg->rkcg_next_target_assignment && clear_next_assignment; + if (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Reconciliation in progress, " + "postponing next one"); return RD_KAFKA_OP_RES_HANDLED; + } is_assignment_different = rd_kafka_cgrp_consumer_is_new_assignment_different( @@ -2652,12 +2658,20 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( * INIT or state STEADY, keeps it as next target assignment * otherwise. */ if (!is_assignment_different) { - if (rkcg->rkcg_next_target_assignment && - clear_next_assignment) { + if (has_next_target_assignment_to_clear) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_next_target_assignment); rkcg->rkcg_next_target_assignment = NULL; } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Not reconciling new assignment: " + "Assignment is the same. " + "Next assignment %s", + (has_next_target_assignment_to_clear + ? "cleared" + : "not cleared")); + } else if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK; @@ -2668,8 +2682,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rkcg->rkcg_target_assignment = rd_kafka_topic_partition_list_copy(new_target_assignment); - if (rkcg->rkcg_next_target_assignment && - clear_next_assignment) { + if (has_next_target_assignment_to_clear) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_next_target_assignment); rkcg->rkcg_next_target_assignment = NULL; @@ -2685,8 +2698,12 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment( rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", "Reconciliation starts with new target " - "assignment \"%s\"", - rkcg_target_assignment_str); + "assignment \"%s\". " + "Next assignment %s", + rkcg_target_assignment_str, + (has_next_target_assignment_to_clear + ? "cleared" + : "not cleared")); } rd_kafka_cgrp_handle_assignment(rkcg, rkcg->rkcg_target_assignment); @@ -2813,8 +2830,8 @@ void rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( rd_list_t *missing_topic_ids = NULL; if (!rkcg->rkcg_next_target_assignment->cnt) { - /* No metadata to request, continue with handle_next_assignment. - */ + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "No metadata to request, continuing"); rd_kafka_topic_partition_list_t *new_target_assignment = rd_kafka_topic_partition_list_new(0); rd_kafka_cgrp_consumer_handle_next_assignment( @@ -2974,7 +2991,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, /* We've finished reconciliation but we weren't * sending an ack, need to send a new HB with the ack. */ - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "not subscribed anymore"); } } @@ -3102,7 +3120,8 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "coordinator query"); } if (actions & RD_KAFKA_ERR_ACTION_RETRY && @@ -3794,7 +3813,7 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk, if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) { rd_kafka_cgrp_consumer_expedite_next_heartbeat( - rk->rk_cgrp); + rk->rk_cgrp, "OffsetCommit error: Unknown member"); } else { /* Revoke assignment and rebalance on unknown member */ rd_kafka_cgrp_set_member_id(rk->rk_cgrp, ""); @@ -3819,7 +3838,8 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk, case RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH: /* FIXME: Add logs.*/ - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rk->rk_cgrp); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rk->rk_cgrp, "OffsetCommit error: Stale member epoch"); if (!rd_strcmp(rko_orig->rko_u.offset_commit.reason, "manual")) /* Don't retry manual commits giving this error. * TODO: do this in a faster and cleaner way @@ -4978,7 +4998,10 @@ rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts, rd_kafka_cgrp_consumer_leave(rkcg); rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, + "max poll interval " + "exceeded"); } else { /* Leave the group before calling rebalance since the standard * leave will be triggered first after the rebalance callback @@ -5843,6 +5866,21 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, ~RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION) | RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION; rkcg_subscription = rkcg->rkcg_subscription; + + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { + char rkcg_new_subscription_str[512] = "NULL"; + + if (rkcg_subscription) { + rd_kafka_topic_partition_list_str( + rkcg_subscription, + rkcg_new_subscription_str, + sizeof(rkcg_new_subscription_str), 0); + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Sending new subscription \"%s\"", + rkcg_new_subscription_str); + } } rkcg->rkcg_expedite_heartbeat_retries++; @@ -5890,6 +5928,11 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_consumer_flags |= RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Revoking assignment as lost an rejoining in join state %s", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_true, rd_true, "member fenced - rejoining"); } @@ -5990,7 +6033,8 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg, RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION; rd_kafka_cgrp_subscription_set(rkcg, rktparlist); - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "subscription changed"); } else { rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/); } @@ -6092,7 +6136,8 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "back to steady state"); if (rkcg->rkcg_rebalance_rejoin) { rkcg->rkcg_rebalance_rejoin = rd_false; @@ -6123,7 +6168,8 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { still_in_group &= !rd_kafka_cgrp_try_terminate(rkcg); if (still_in_group) - rd_kafka_cgrp_consumer_expedite_next_heartbeat(rkcg); + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "back to init state"); break; } default: @@ -6131,10 +6177,8 @@ static void rd_kafka_cgrp_consumer_assignment_done(rd_kafka_cgrp_t *rkcg) { } } -/** - * FIXME: Add reason and logging. - */ -void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) { +void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg, + const char *reason) { if (rkcg->rkcg_group_protocol != RD_KAFKA_GROUP_PROTOCOL_CONSUMER) return; @@ -6162,6 +6206,11 @@ void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg) { /* Set the exponential backoff. */ rd_interval_backoff(&rkcg->rkcg_heartbeat_intvl, backoff); + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Expediting next heartbeat" + ", with backoff %" PRId64 ": %s", + backoff, reason); + /* Scheduling the timer awakes main loop too. */ rd_kafka_timer_start_oneshot(&rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_serve_timer, rd_true, backoff, diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 23f0467f98..afb671f02a 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -434,6 +434,7 @@ rd_kafka_rebalance_protocol2str(rd_kafka_rebalance_protocol_t protocol) { } } -void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg); +void rd_kafka_cgrp_consumer_expedite_next_heartbeat(rd_kafka_cgrp_t *rkcg, + const char *reason); #endif /* _RDKAFKA_CGRP_H_ */ diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 82b7837d80..fde85ab136 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -1059,7 +1059,8 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) { rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED)) { rd_kafka_cgrp_consumer_expedite_next_heartbeat( - rk->rk_cgrp); + rk->rk_cgrp, + "app polled after poll interval exceeded"); } } }