Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to metadata cache expiration on full metadata refresh #4677

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ librdkafka v2.3.1 is a maintenance release:
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).


## Fixes
Expand All @@ -31,6 +32,10 @@ librdkafka v2.3.1 is a maintenance release:
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).
* Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and occasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4677).

### Consumer fixes

Expand Down
53 changes: 28 additions & 25 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int broker_changes = 0;
int cache_changes = 0;
rd_ts_t ts_start = rd_clock();

/* If client rack is present, the metadata cache (topic or full) needs
* to contain the partition to rack map. */
rd_bool_t has_client_rack = rk->rk_conf.client_rack &&
Expand Down Expand Up @@ -850,23 +850,24 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers, md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
/* Only update cache when not asking
* for all topics or cache entry
* already exists. */
rd_kafka_wrlock(rk);
cache_changes +=
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt,
all_topics /*cache entry needs to exist
*if all_topics*/);
rd_kafka_wrunlock(rk);
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
}

/* Requested topics not seen in metadata? Propogate to topic code. */
Expand Down Expand Up @@ -979,9 +980,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

if (all_topics) {
/* Expire all cache entries that were not updated. */
rd_kafka_metadata_cache_evict_by_age(rkb->rkb_rk, ts_start);

/* All hints have been replaced by the corresponding entry.
* Rest of hints can be removed as topics aren't present
* in full metadata. */
rd_kafka_metadata_cache_purge_all_hints(rkb->rkb_rk);
if (rkb->rkb_rk->rk_full_metadata)
rd_kafka_metadata_destroy(
&rkb->rkb_rk->rk_full_metadata->metadata);
Expand All @@ -1001,10 +1003,6 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"Caching full metadata with "
"%d broker(s) and %d topic(s): %s",
md->broker_cnt, md->topic_cnt, reason);
} else {
if (cache_changes)
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
Expand All @@ -1013,6 +1011,11 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);

if (cache_changes) {
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}

rd_kafka_wrunlock(rkb->rkb_rk);

if (broker_changes) {
Expand Down
7 changes: 4 additions & 3 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,16 @@ int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
int rd_kafka_metadata_cache_delete_by_topic_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t topic_id);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk);
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_metadata,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt);
size_t broker_cnt,
rd_bool_t only_existing);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name maybe update_only_existing and let's explain what it means in the doc.

Copy link
Contributor Author

@emasab emasab Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you search for only_ the code convention in this repo is to avoid repeating the action in variable names, for example in rd_kafka_buf_write_topic_partitions we have only_invalid_offsets not write_only_invalid_offsets.

Okey for the doc

void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
Expand Down
66 changes: 29 additions & 37 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,45 +182,27 @@ static int rd_kafka_metadata_cache_evict(rd_kafka_t *rk) {


/**
* @brief Evict timed out entries from cache based on their insert/update time
* rather than expiry time. Any entries older than \p ts will be evicted.
* @brief Remove all cache hints,.
* This is done when the Metadata response has been parsed and
* replaced hints with existing topic information, thus this will
* only remove unmatched topics from the cache.
*
* @returns the number of entries evicted.
* @returns the number of purged hints
*
* @locks_required rd_kafka_wrlock()
*/
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts) {
int rd_kafka_metadata_cache_purge_all_hints(rd_kafka_t *rk) {
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
int cnt = 0;
struct rd_kafka_metadata_cache_entry *rkmce, *tmp;

TAILQ_FOREACH_SAFE(rkmce, &rk->rk_metadata_cache.rkmc_expiry,
rkmce_link, tmp) {
if (rkmce->rkmce_ts_insert <= ts) {
if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce)) {
rd_kafka_metadata_cache_delete(rk, rkmce, 1);
cnt++;
}
}

/* Update expiry timer */
rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry);
if (rkmce)
rd_kafka_timer_start(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_expiry_tmr,
rkmce->rkmce_ts_expires - rd_clock(),
rd_kafka_metadata_cache_evict_tmr_cb, rk);
else
rd_kafka_timer_stop(&rk->rk_timers,
&rk->rk_metadata_cache.rkmc_expiry_tmr, 1);

rd_kafka_dbg(rk, METADATA, "METADATA",
"Expired %d entries older than %dms from metadata cache "
"(%d entries remain)",
cnt, (int)((rd_clock() - ts) / 1000),
rk->rk_metadata_cache.rkmc_cnt);

if (cnt)
rd_kafka_metadata_cache_propagate_changes(rk);

return cnt;
}

Expand Down Expand Up @@ -474,38 +456,41 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) {
* For permanent errors (authorization failures), we keep
* the entry cached for metadata.max.age.ms.
*
* @param only_existing Update only existing metadata cache entries,
* either valid or hinted.
*
* @return 1 on metadata change, 0 when no change was applied
*
* @remark The cache expiry timer will not be updated/started,
* call rd_kafka_metadata_cache_expiry_start() instead.
*
* @locks rd_kafka_wrlock()
*/
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_racks,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt) {
size_t broker_cnt,
rd_bool_t only_existing) {
struct rd_kafka_metadata_cache_entry *rkmce = NULL;
rd_ts_t now = rd_clock();
rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000);
int changed = 1;
if (unlikely(!mdt->topic)) {
rkmce =
rd_kafka_metadata_cache_find_by_id(rk, mdit->topic_id, 1);
if (only_existing) {
if (likely(mdt->topic != NULL)) {
rkmce = rd_kafka_metadata_cache_find(rk, mdt->topic, 0);
} else {
rkmce = rd_kafka_metadata_cache_find_by_id(
rk, mdit->topic_id, 1);
}
if (!rkmce)
return;
return 0;
}

if (unlikely(!mdt->topic)) {
/* Cache entry found but no topic name:
* delete it. */
changed = rd_kafka_metadata_cache_delete_by_topic_id(
rk, mdit->topic_id);
} else {
if (likely(mdt->topic != NULL)) {
/* Cache unknown topics for a short while (100ms) to allow the
* cgrp logic to find negative cache hits. */
if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
Expand All @@ -520,10 +505,17 @@ void rd_kafka_metadata_cache_topic_update(
else
changed = rd_kafka_metadata_cache_delete_by_name(
rk, mdt->topic);
} else {
/* Cache entry found but no topic name:
* delete it. */
changed = rd_kafka_metadata_cache_delete_by_topic_id(
rk, mdit->topic_id);
}

if (changed && propagate)
rd_kafka_metadata_cache_propagate_changes(rk);

return changed;
}


Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,

rd_kafka_wrlock(rkt->rkt_rk);
rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true,
rd_false, NULL, 0);
rd_false, NULL, 0, rd_false);
rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock());
rd_kafka_wrunlock(rkt->rkt_rk);
rd_free(partitions);
Expand Down
98 changes: 98 additions & 0 deletions tests/0146-metadata_mock.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2024, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "test.h"


/**
* @brief Metadata should persists in cache after
* a full metadata refresh.
*
* @param assignor Assignor to use
*/
static void do_test_metadata_persists_in_cache(const char *assignor) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
rd_kafka_topic_t *rkt;
const rd_kafka_metadata_t *md;
rd_kafka_topic_partition_list_t *subscription;

SUB_TEST_QUICK("%s", assignor);

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "partition.assignment.strategy", assignor);
test_conf_set(conf, "group.id", topic);

rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic, 0);

rkt = test_create_consumer_topic(rk, topic);

/* Metadata for topic is available */
TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, rkt, &md, 1000));
rd_kafka_metadata_destroy(md);
md = NULL;

/* Subscribe to same topic */
TEST_CALL_ERR__(rd_kafka_subscribe(rk, subscription));

/* Request full metadata */
TEST_CALL_ERR__(rd_kafka_metadata(rk, 1, NULL, &md, 1000));
rd_kafka_metadata_destroy(md);
md = NULL;

/* Subscribing shouldn't give UNKNOWN_TOPIC_OR_PART err.
* Verify no error was returned. */
test_consumer_poll_no_msgs("no error", rk, 0, 100);

rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

int main_0146_metadata_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

do_test_metadata_persists_in_cache("range");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not round robin case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference can be because of taking the eager path instead of the incremental one, but roundrobin also takes the eager path, as "range", so it's not important for this test.


do_test_metadata_persists_in_cache("cooperative-sticky");

return 0;
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ set(
0143-exponential_backoff_mock.c
0144-idempotence_mock.c
0145-pause_resume_mock.c
0146-metadata_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
Expand Down
11 changes: 1 addition & 10 deletions tests/cluster_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from trivup.trivup import Cluster
from trivup.apps.ZookeeperApp import ZookeeperApp
from trivup.apps.KafkaBrokerApp import KafkaBrokerApp as KafkaBrokerAppOrig
from trivup.apps.KafkaBrokerApp import KafkaBrokerApp
from trivup.apps.KerberosKdcApp import KerberosKdcApp
from trivup.apps.SslApp import SslApp
from trivup.apps.OauthbearerOIDCApp import OauthbearerOIDCApp
Expand All @@ -35,15 +35,6 @@ def read_scenario_conf(scenario):
return parser.load(f)


# FIXME: merge in trivup
class KafkaBrokerApp(KafkaBrokerAppOrig):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it was added back during a rebase

def _add_simple_authorizer(self, conf_blob):
conf_blob.append(
'authorizer.class.name=' +
'org.apache.kafka.metadata.authorizer.StandardAuthorizer')
conf_blob.append('super.users=User:ANONYMOUS')


class LibrdkafkaTestCluster(Cluster):
def __init__(self, version, conf={}, num_brokers=3, debug=False,
scenario="default", kraft=False):
Expand Down
Loading