Skip to content

Commit

Permalink
Fix to metadata refresh interruption
Browse files Browse the repository at this point in the history
Metadata refreshes without partition
leader change could lead to a loop of
metadata calls at fixed intervals.
Solved by stopping metadata refresh
when all existing metadata is non-stale.
Happening since 2.3.0
  • Loading branch information
emasab committed Apr 15, 2024
1 parent 70d2099 commit bdccc8c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ librdkafka v2.3.1 is a maintenance release:
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
* Fix to metadata refresh interruption (#4679).


## Fixes
Expand All @@ -42,6 +43,9 @@ librdkafka v2.3.1 is a maintenance release:
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).
* Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4679).

### Consumer fixes

Expand Down
23 changes: 16 additions & 7 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1277,8 +1277,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
rd_kafka_broker_t **partbrokers;
int leader_cnt = 0;
int old_state;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_updated_leader_epoch = rd_false;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_stale_leader_epoch = rd_false;

if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
Expand Down Expand Up @@ -1328,8 +1328,17 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
upd += rd_kafka_topic_partition_cnt_update(rkt,
mdt->partition_cnt);
if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO))
if (rd_kafka_Uuid_cmp(mdit->topic_id, RD_KAFKA_UUID_ZERO)) {
/* FIXME: an offset reset must be triggered.
* when rkt_topic_id wasn't zero.
* There are no problems
* in test 0107_topic_recreate if offsets in new
* topic are lower than in previous one,
* causing an out of range and an offset reset,
* but the rarer case where they're higher needs
* to be checked. */
rkt->rkt_topic_id = mdit->topic_id;
}
/* If the metadata times out for a topic (because all brokers
* are down) the state will transition to S_UNKNOWN.
* When updated metadata is eventually received there might
Expand All @@ -1343,7 +1352,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,

/* Update leader for each partition */
for (j = 0; j < mdt->partition_cnt; j++) {
int r;
int r = 0;
rd_kafka_broker_t *leader;
int32_t leader_epoch = mdit->partitions[j].leader_epoch;
rd_kafka_toppar_t *rktp =
Expand All @@ -1362,8 +1371,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
* set to -1, we assume that metadata is not stale. */
if (leader_epoch == -1)
partition_exists_with_no_leader_epoch = rd_true;
else if (rktp->rktp_leader_epoch < leader_epoch)
partition_exists_with_updated_leader_epoch = rd_true;
else if (leader_epoch < rktp->rktp_leader_epoch)
partition_exists_with_stale_leader_epoch = rd_true;


/* Update leader for partition */
Expand All @@ -1386,7 +1395,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
* stale, we can turn off fast leader query. */
if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt &&
(partition_exists_with_no_leader_epoch ||
partition_exists_with_updated_leader_epoch))
!partition_exists_with_stale_leader_epoch))
rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;

if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
Expand Down

0 comments on commit bdccc8c

Please sign in to comment.