Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Apr 14, 2024
1 parent 6bd2c95 commit a159193
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) {
* For permanent errors (authorization failures), we keep
* the entry cached for metadata.max.age.ms.
*
* @param only_existing Update only existing metadata cache entries,
* either valid or hinted.
*
* @return 1 on metadata change, 0 when no change was applied
*
* @remark The cache expiry timer will not be updated/started,
Expand All @@ -476,23 +479,18 @@ int rd_kafka_metadata_cache_topic_update(
rd_ts_t now = rd_clock();
rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
int changed = 1;
if (likely(mdt->topic && only_existing)) {
rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0);
if (!rkmce)
return 0;
} else if (unlikely(!mdt->topic)) {
rkmce =
rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1);
if (only_existing) {
if (likely(mdt->topic != NULL)) {
rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0);
} else {
rkmce = rd_kafka_metadata_cache_find_by_id(
rk, mdit->topic_id, 1);
}
if (!rkmce)
return 0;
}

if (unlikely(!mdt->topic)) {
/* Cache entry found but no topic name:
* delete it. */
changed = rd_kafka_metadata_cache_delete_by_topic_id(
rk, mdit->topic_id);
} else {
if (likely(mdt->topic != NULL)) {
/* Cache unknown topics for a short while (100ms) to allow the
* cgrp logic to find negative cache hits. */
if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
Expand All @@ -507,6 +505,11 @@ int rd_kafka_metadata_cache_topic_update(
else
changed = rd_kafka_metadata_cache_delete_by_name(
rk, mdt->topic);
} else {
/* Cache entry found but no topic name:
* delete it. */
changed = rd_kafka_metadata_cache_delete_by_topic_id(
rk, mdit->topic_id);
}

if (changed && propagate)
Expand Down

0 comments on commit a159193

Please sign in to comment.