diff --git a/CHANGELOG.md b/CHANGELOG.md
index d555639be8..1ccfad49cf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,7 @@ librdkafka v2.3.1 is a maintenance release:
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
+ * Fix to metadata cache expiration on full metadata refresh (#4677).
## Fixes
@@ -31,6 +32,10 @@ librdkafka v2.3.1 is a maintenance release:
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
+ * Metadata cache was cleared on full metadata refresh, leading to unnecessary
+ refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
+ cache for existing or hinted entries instead of clearing them.
+ Happening since 2.1.0 (#4677).
### Consumer fixes
diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c
index bc8e5bc5ee..7e9c90376d 100644
--- a/src/rdkafka_metadata.c
+++ b/src/rdkafka_metadata.c
@@ -490,7 +490,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int broker_changes = 0;
int cache_changes = 0;
- rd_ts_t ts_start = rd_clock();
+
/* If client rack is present, the metadata cache (topic or full) needs
* to contain the partition to rack map. */
rd_bool_t has_client_rack = rk->rk_conf.client_rack &&
@@ -850,23 +850,24 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
- if (!all_topics) {
- /* Only update cache when not asking
- * for all topics. */
-
- rd_kafka_wrlock(rk);
- rd_kafka_metadata_cache_topic_update(
- rk, &md->topics[i], &mdi->topics[i],
- rd_false /*propagate later*/,
- /* use has_client_rack rather than
- compute_racks. We need cached rack ids
- only in case we need to rejoin the group
- if they change and client.rack is set
- (KIP-881). */
- has_client_rack, mdi->brokers, md->broker_cnt);
- cache_changes++;
- rd_kafka_wrunlock(rk);
- }
+ /* Only update cache when not asking
+ * for all topics or cache entry
+ * already exists. */
+ rd_kafka_wrlock(rk);
+ cache_changes +=
+ rd_kafka_metadata_cache_topic_update(
+ rk, &md->topics[i], &mdi->topics[i],
+ rd_false /*propagate later*/,
+ /* use has_client_rack rather than
+ compute_racks. We need cached rack ids
+ only in case we need to rejoin the group
+ if they change and client.rack is set
+ (KIP-881). */
+ has_client_rack, mdi->brokers,
+ md->broker_cnt,
+ all_topics /*cache entry needs to exist
+ *if all_topics*/);
+ rd_kafka_wrunlock(rk);
}
/* Requested topics not seen in metadata? Propogate to topic code. */
@@ -979,9 +980,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}
if (all_topics) {
- /* Expire all cache entries that were not updated. */
- rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start);
-
+ /* All hints have been replaced by the corresponding entry.
+ * Rest of hints can be removed as topics aren't present
+ * in full metadata. */
+ rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk);
if (rkb->rkb_rk->rk_full_metadata)
rd_kafka_metadata_destroy(
&rkb->rkb_rk->rk_full_metadata->metadata);
@@ -1001,10 +1003,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"Caching full metadata with "
"%d broker(s) and %d topic(s): %s",
md->broker_cnt, md->topic_cnt, reason);
- } else {
- if (cache_changes)
- rd_kafka_metadata_cache_propagate_changes(rk);
- rd_kafka_metadata_cache_expiry_start(rk);
}
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
@@ -1013,6 +1011,11 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);
+ if (cache_changes) {
+ rd_kafka_metadata_cache_propagate_changes(rk);
+ rd_kafka_metadata_cache_expiry_start(rk);
+ }
+
rd_kafka_wrunlock(rkb->rkb_rk);
if (broker_changes) {
diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h
index 495ca6436e..b0926845ef 100644
--- a/src/rdkafka_metadata.h
+++ b/src/rdkafka_metadata.h
@@ -274,15 +274,16 @@ int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
-int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
-void rd_kafka_metadata_cache_topic_update(
+int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk);
+int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_metadata,
rd_kafka_metadata_broker_internal_t *brokers,
- size_t broker_cnt);
+ size_t broker_cnt,
+ rd_bool_t only_existing);
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c
index 75e39134f9..d4c93cd11c 100644
--- a/src/rdkafka_metadata_cache.c
+++ b/src/rdkafka_metadata_cache.c
@@ -182,45 +182,27 @@ static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk) {
/**
- * @brief Evict timed out entries from cache based on their insert/update time
- * rather than expiry time. Any entries older than \p ts will be evicted.
+ * @brief Remove all cache hints,.
+ * This is done when the Metadata response has been parsed and
+ * replaced hints with existing topic information, thus this will
+ * only remove unmatched topics from the cache.
*
- * @returns the number of entries evicted.
+ * @returns the number of purged hints
*
* @locks_required rd_kafka_wrlock()
*/
-int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts) {
+int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk) {
int cnt = 0;
struct rd_kafka_metadata_cache_entry *rkmce, *tmp;
TAILQ_FOREACH_SAFE(rkmce, &rk->rk_metadata_cache.rkmc_expiry,
rkmce_link, tmp) {
- if (rkmce->rkmce_ts_insert <= ts) {
+ if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce)) {
rd_kafka_metadata_cache_delete(rk, rkmce, 1);
cnt++;
}
}
- /* Update expiry timer */
- rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry);
- if (rkmce)
- rd_kafka_timer_start(&rk->rk_timers,
- &rk->rk_metadata_cache.rkmc_expiry_tmr,
- rkmce->rkmce_ts_expires - rd_clock(),
- rd_kafka_metadata_cache_evict_tmr_cb, rk);
- else
- rd_kafka_timer_stop(&rk->rk_timers,
- &rk->rk_metadata_cache.rkmc_expiry_tmr, 1);
-
- rd_kafka_dbg(rk, METADATA, "METADATA",
- "Expired %d entries older than %dms from metadata cache "
- "(%d entries remain)",
- cnt, (int)((rd_clock() - ts) / 1000),
- rk->rk_metadata_cache.rkmc_cnt);
-
- if (cnt)
- rd_kafka_metadata_cache_propagate_changes(rk);
-
return cnt;
}
@@ -474,6 +456,9 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) {
* For permanent errors (authorization failures), we keep
* the entry cached for metadata.max.age.ms.
*
+ * @param only_existing Update only existing metadata cache entries,
+ * either valid or hinted.
+ *
* @return 1 on metadata change, 0 when no change was applied
*
* @remark The cache expiry timer will not be updated/started,
@@ -481,31 +466,31 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) {
*
* @locks rd_kafka_wrlock()
*/
-void rd_kafka_metadata_cache_topic_update(
+int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_racks,
rd_kafka_metadata_broker_internal_t *brokers,
- size_t broker_cnt) {
+ size_t broker_cnt,
+ rd_bool_t only_existing) {
struct rd_kafka_metadata_cache_entry *rkmce = NULL;
rd_ts_t now = rd_clock();
rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
int changed = 1;
- if (unlikely(!mdt->topic)) {
- rkmce =
- rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1);
+ if (only_existing) {
+ if (likely(mdt->topic != NULL)) {
+ rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0);
+ } else {
+ rkmce = rd_kafka_metadata_cache_find_by_id(
+ rk, mdit->topic_id, 1);
+ }
if (!rkmce)
- return;
+ return 0;
}
- if (unlikely(!mdt->topic)) {
- /* Cache entry found but no topic name:
- * delete it. */
- changed = rd_kafka_metadata_cache_delete_by_topic_id(
- rk, mdit->topic_id);
- } else {
+ if (likely(mdt->topic != NULL)) {
/* Cache unknown topics for a short while (100ms) to allow the
* cgrp logic to find negative cache hits. */
if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
@@ -520,10 +505,17 @@ void rd_kafka_metadata_cache_topic_update(
else
changed = rd_kafka_metadata_cache_delete_by_name(
rk, mdt->topic);
+ } else {
+ /* Cache entry found but no topic name:
+ * delete it. */
+ changed = rd_kafka_metadata_cache_delete_by_topic_id(
+ rk, mdit->topic_id);
}
if (changed && propagate)
rd_kafka_metadata_cache_propagate_changes(rk);
+
+ return changed;
}
diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c
index ccaf535a92..edd471b03b 100644
--- a/src/rdkafka_topic.c
+++ b/src/rdkafka_topic.c
@@ -2046,7 +2046,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,
rd_kafka_wrlock(rkt->rkt_rk);
rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true,
- rd_false, NULL, 0);
+ rd_false, NULL, 0, rd_false);
rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock());
rd_kafka_wrunlock(rkt->rkt_rk);
rd_free(partitions);
diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c
new file mode 100644
index 0000000000..56f5b81f8c
--- /dev/null
+++ b/tests/0146-metadata_mock.c
@@ -0,0 +1,98 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2024, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+
+/**
+ * @brief Metadata should persists in cache after
+ * a full metadata refresh.
+ *
+ * @param assignor Assignor to use
+ */
+static void do_test_metadata_persists_in_cache(const char *assignor) {
+ rd_kafka_t *rk;
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_t *rkt;
+ const rd_kafka_metadata_t *md;
+ rd_kafka_topic_partition_list_t *subscription;
+
+ SUB_TEST_QUICK("%s", assignor);
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+ rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
+
+ test_conf_init(&conf, NULL, 10);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "partition.assignment.strategy", assignor);
+ test_conf_set(conf, "group.id", topic);
+
+ rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
+
+ subscription = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(subscription, topic, 0);
+
+ rkt = test_create_consumer_topic(rk, topic);
+
+ /* Metadata for topic is available */
+ TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, rkt, &md, 1000));
+ rd_kafka_metadata_destroy(md);
+ md = NULL;
+
+ /* Subscribe to same topic */
+ TEST_CALL_ERR__(rd_kafka_subscribe(rk, subscription));
+
+ /* Request full metadata */
+ TEST_CALL_ERR__(rd_kafka_metadata(rk, 1, NULL, &md, 1000));
+ rd_kafka_metadata_destroy(md);
+ md = NULL;
+
+ /* Subscribing shouldn't give UNKNOWN_TOPIC_OR_PART err.
+ * Verify no error was returned. */
+ test_consumer_poll_no_msgs("no error", rk, 0, 100);
+
+ rd_kafka_topic_partition_list_destroy(subscription);
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(rk);
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+int main_0146_metadata_mock(int argc, char **argv) {
+ TEST_SKIP_MOCK_CLUSTER(0);
+
+ do_test_metadata_persists_in_cache("range");
+
+ do_test_metadata_persists_in_cache("cooperative-sticky");
+
+ return 0;
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 30a1363b27..62ce0deb02 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -136,6 +136,7 @@ set(
0143-exponential_backoff_mock.c
0144-idempotence_mock.c
0145-pause_resume_mock.c
+ 0146-metadata_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
diff --git a/tests/cluster_testing.py b/tests/cluster_testing.py
index 86d3d91248..d3189f1cdb 100755
--- a/tests/cluster_testing.py
+++ b/tests/cluster_testing.py
@@ -9,7 +9,7 @@
from trivup.trivup import Cluster
from trivup.apps.ZookeeperApp import ZookeeperApp
-from trivup.apps.KafkaBrokerApp import KafkaBrokerApp as KafkaBrokerAppOrig
+from trivup.apps.KafkaBrokerApp import KafkaBrokerApp
from trivup.apps.KerberosKdcApp import KerberosKdcApp
from trivup.apps.SslApp import SslApp
from trivup.apps.OauthbearerOIDCApp import OauthbearerOIDCApp
@@ -35,15 +35,6 @@ def read_scenario_conf(scenario):
return parser.load(f)
-# FIXME: merge in trivup
-class KafkaBrokerApp(KafkaBrokerAppOrig):
- def _add_simple_authorizer(self, conf_blob):
- conf_blob.append(
- 'authorizer.class.name=' +
- 'org.apache.kafka.metadata.authorizer.StandardAuthorizer')
- conf_blob.append('super.users=User:ANONYMOUS')
-
-
class LibrdkafkaTestCluster(Cluster):
def __init__(self, version, conf={}, num_brokers=3, debug=False,
scenario="default", kraft=False):
diff --git a/tests/test.c b/tests/test.c
index f65e70625f..64ab0b247f 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -259,6 +259,7 @@ _TEST_DECL(0142_reauthentication);
_TEST_DECL(0143_exponential_backoff_mock);
_TEST_DECL(0144_idempotence_mock);
_TEST_DECL(0145_pause_resume_mock);
+_TEST_DECL(0146_metadata_mock);
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -514,6 +515,7 @@ struct test tests[] = {
_TEST(0143_exponential_backoff_mock, TEST_F_LOCAL),
_TEST(0144_idempotence_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)),
_TEST(0145_pause_resume_mock, TEST_F_LOCAL),
+ _TEST(0146_metadata_mock, TEST_F_LOCAL),
/* Manual tests */
diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj
index f9ffa00d0a..a354f278f8 100644
--- a/win32/tests/tests.vcxproj
+++ b/win32/tests/tests.vcxproj
@@ -226,6 +226,7 @@
+