From d54d6dbfaf7eb27080b26a51998de48afd92036e Mon Sep 17 00:00:00 2001 From: Kyle Phelps Date: Thu, 10 Oct 2024 14:02:39 -0400 Subject: [PATCH] Revert "Fix for an undesired partition migration with stale leader epoch (#4680)" This reverts commit 6584ed7c8b00786121c07bc0df5b3d7fa8da2661. --- src/rdkafka_topic.c | 79 +++++++++++++++------------------- tests/0146-metadata_mock.c | 87 -------------------------------------- 2 files changed, 35 insertions(+), 131 deletions(-) diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index c9bd3b415f..bb500097f9 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -659,8 +659,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t *leader, int32_t leader_epoch) { rd_kafka_toppar_t *rktp; - rd_bool_t need_epoch_validation = rd_false; - int r = 0; + rd_bool_t fetching_from_follower, need_epoch_validation = rd_false; + int r = 0; rktp = rd_kafka_toppar_get(rkt, partition, 0); if (unlikely(!rktp)) { @@ -688,17 +688,14 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, leader_epoch, rktp->rktp_leader_epoch); - if (rktp->rktp_fetch_state != - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { + if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) { rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); /* from get() */ return 0; } } - if (rktp->rktp_leader_epoch == -1 || - leader_epoch > rktp->rktp_leader_epoch) { - rd_bool_t fetching_from_follower; + if (leader_epoch > rktp->rktp_leader_epoch) { rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -706,50 +703,44 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, rktp->rktp_leader_id, rktp->rktp_leader_epoch, leader_id, leader_epoch); - if (leader_epoch > rktp->rktp_leader_epoch) - rktp->rktp_leader_epoch = leader_epoch; + rktp->rktp_leader_epoch = leader_epoch; + need_epoch_validation = rd_true; + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) need_epoch_validation = rd_true; + fetching_from_follower = + leader != NULL && rktp->rktp_broker != NULL && + rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && + rktp->rktp_broker != leader; - fetching_from_follower = - leader != NULL && rktp->rktp_broker != NULL && - rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && - rktp->rktp_broker != leader; - - if (fetching_from_follower && - rktp->rktp_leader_id == leader_id) { - rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", - "Topic %s [%" PRId32 "]: leader %" PRId32 - " unchanged, " - "not migrating away from preferred " - "replica %" PRId32, - rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, leader_id, - rktp->rktp_broker_id); - r = 0; - - } else { + if (fetching_from_follower && rktp->rktp_leader_id == leader_id) { + rd_kafka_dbg( + rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 "]: leader %" PRId32 + " unchanged, " + "not migrating away from preferred replica %" PRId32, + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + leader_id, rktp->rktp_broker_id); + r = 0; - if (rktp->rktp_leader_id != leader_id || - rktp->rktp_leader != leader) { - /* Update leader if it has changed */ - rktp->rktp_leader_id = leader_id; - if (rktp->rktp_leader) - rd_kafka_broker_destroy( - rktp->rktp_leader); - if (leader) - rd_kafka_broker_keep(leader); - rktp->rktp_leader = leader; - } + } else { - /* Update handling broker */ - r = rd_kafka_toppar_broker_update( - rktp, leader_id, leader, "leader updated"); + if (rktp->rktp_leader_id != leader_id || + rktp->rktp_leader != leader) { + /* Update leader if it has changed */ + rktp->rktp_leader_id = leader_id; + if (rktp->rktp_leader) + rd_kafka_broker_destroy(rktp->rktp_leader); + if (leader) + rd_kafka_broker_keep(leader); + rktp->rktp_leader = leader; } - } else if (rktp->rktp_fetch_state == - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) - need_epoch_validation = rd_true; + /* Update handling broker */ + r = rd_kafka_toppar_broker_update(rktp, leader_id, leader, + "leader updated"); + } if (need_epoch_validation) { /* Set offset validation position, diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c index c0f1d7b11a..7274ff2c12 100644 --- a/tests/0146-metadata_mock.c +++ b/tests/0146-metadata_mock.c @@ -35,16 +35,6 @@ static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request, return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Metadata; } -static rd_bool_t is_fetch_request(rd_kafka_mock_request_t *request, - void *opaque) { - int32_t *broker_id = (int32_t *)opaque; - rd_bool_t ret = - rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; - if (broker_id) - ret &= rd_kafka_mock_request_id(request) == *broker_id; - return ret; -} - /** * @brief Metadata should persists in cache after * a full metadata refresh. @@ -151,82 +141,6 @@ static void do_test_fast_metadata_refresh_stops(void) { SUB_TEST_PASS(); } -/** - * @brief A stale leader received while validating shouldn't - * migrate back the partition to that stale broker. - */ -static void do_test_stale_metadata_doesnt_migrate_partition(void) { - int i, fetch_requests; - rd_kafka_t *rk; - const char *bootstraps; - rd_kafka_mock_cluster_t *mcluster; - const char *topic = test_mk_topic_name(__FUNCTION__, 1); - rd_kafka_conf_t *conf; - int32_t expected_broker_id; - - SUB_TEST_QUICK(); - - mcluster = test_mock_cluster_new(3, &bootstraps); - rd_kafka_mock_topic_create(mcluster, topic, 1, 3); - rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); - - test_conf_init(&conf, NULL, 10); - test_conf_set(conf, "bootstrap.servers", bootstraps); - test_conf_set(conf, "group.id", topic); - test_conf_set(conf, "auto.offset.reset", "earliest"); - test_conf_set(conf, "enable.auto.commit", "false"); - test_conf_set(conf, "fetch.error.backoff.ms", "10"); - test_conf_set(conf, "fetch.wait.max.ms", "10"); - test_conf_set(conf, "fetch.queue.backoff.ms", "10"); - - rk = test_create_handle(RD_KAFKA_CONSUMER, conf); - - test_consumer_subscribe(rk, topic); - - /* Produce and consume to leader 1 */ - test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 0, "bootstrap.servers", - bootstraps, NULL); - test_consumer_poll_exact("read first", rk, 0, 0, 0, 1, rd_true, NULL); - - /* Change leader to 2, Fetch fails, refreshes metadata. */ - rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); - - /* Validation fails, metadata refreshed again */ - rd_kafka_mock_broker_push_request_error_rtts( - mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); - - /* Wait partition migrates to broker 2 */ - rd_usleep(100 * 1000, 0); - - /* Ask to return stale metadata while calling OffsetForLeaderEpoch */ - rd_kafka_mock_start_request_tracking(mcluster); - for (i = 0; i < 10; i++) { - rd_kafka_mock_partition_push_leader_response( - mcluster, topic, 0, 1 /*leader id*/, 0 /*leader epoch*/); - } - - /* After the error on OffsetForLeaderEpoch metadata is refreshed - * and it returns the stale metadata. - * 1s for the OffsetForLeaderEpoch plus at least 500ms for - * restarting the fetch requests */ - rd_usleep(2000 * 1000, 0); - - /* Partition doesn't have to migrate back to broker 1 */ - expected_broker_id = 1; - fetch_requests = test_mock_wait_matching_requests( - mcluster, 0, 500, is_fetch_request, &expected_broker_id); - TEST_ASSERT(fetch_requests == 0, - "No fetch request should be received by broker 1, got %d", - fetch_requests); - rd_kafka_mock_stop_request_tracking(mcluster); - - rd_kafka_destroy(rk); - test_mock_cluster_destroy(mcluster); - - SUB_TEST_PASS(); -} - /** * @brief A metadata call for an existing topic, just after subscription, * must not cause a UNKNOWN_TOPIC_OR_PART error. @@ -431,7 +345,6 @@ int main_0146_metadata_mock(int argc, char **argv) { do_test_fast_metadata_refresh_stops(); - do_test_stale_metadata_doesnt_migrate_partition(); for (variation = 0; variation < 4; variation++) { do_test_metadata_update_operation(