diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bd003094d..5fc5de5268 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ librdkafka v2.3.1 is a maintenance release: * Fix to metadata cache expiration on full metadata refresh (#4677). * Fix for a wrong error returned on full metadata refresh before joining a consumer group (#4678). + * Fix to metadata refresh interruption (#4679). ## Fixes @@ -47,6 +48,9 @@ librdkafka v2.3.1 is a maintenance release: could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating the consumer group following a metadata refresh only in safe states. Happening since 2.1.0 (#4678). + * Metadata refreshes without partition leader change could lead to a loop of + metadata calls at fixed intervals. Solved by stopping metadata refresh when + all existing metadata is non-stale. Happening since 2.3.0 (#4679). ### Consumer fixes diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index edd471b03b..f9082fd626 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1277,8 +1277,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t **partbrokers; int leader_cnt = 0; int old_state; - rd_bool_t partition_exists_with_no_leader_epoch = rd_false; - rd_bool_t partition_exists_with_updated_leader_epoch = rd_false; + rd_bool_t partition_exists_with_no_leader_epoch = rd_false; + rd_bool_t partition_exists_with_stale_leader_epoch = rd_false; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", @@ -1328,8 +1328,17 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { upd += rd_kafka_topic_partition_cnt_update(rkt, mdt->partition_cnt); - if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) + if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) { + /* FIXME: an offset reset must be triggered. + * when rkt_topic_id wasn't zero. + * There are no problems + * in test 0107_topic_recreate if offsets in new + * topic are lower than in previous one, + * causing an out of range and an offset reset, + * but the rarer case where they're higher needs + * to be checked. */ rkt->rkt_topic_id = mdit->topic_id; + } /* If the metadata times out for a topic (because all brokers * are down) the state will transition to S_UNKNOWN. * When updated metadata is eventually received there might @@ -1343,7 +1352,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Update leader for each partition */ for (j = 0; j < mdt->partition_cnt; j++) { - int r; + int r = 0; rd_kafka_broker_t *leader; int32_t leader_epoch = mdit->partitions[j].leader_epoch; rd_kafka_toppar_t *rktp = @@ -1362,8 +1371,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, * set to -1, we assume that metadata is not stale. */ if (leader_epoch == -1) partition_exists_with_no_leader_epoch = rd_true; - else if (rktp->rktp_leader_epoch < leader_epoch) - partition_exists_with_updated_leader_epoch = rd_true; + else if (leader_epoch < rktp->rktp_leader_epoch) + partition_exists_with_stale_leader_epoch = rd_true; /* Update leader for partition */ @@ -1386,7 +1395,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, * stale, we can turn off fast leader query. */ if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt && (partition_exists_with_no_leader_epoch || - partition_exists_with_updated_leader_epoch)) + !partition_exists_with_stale_leader_epoch)) rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {