From f13b6d2bbf9941029aaf48b310d40a7a32def0c3 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 8 Apr 2024 20:47:14 +0200 Subject: [PATCH] Fix for an undesired partition migration with stale leader epoch A partition migration could happen, using stale metadata, when the partition was undergoing a validation and being retried because of an error. Solved by doing a partition migration only with a non-stale leader epoch. Happening since 2.1.0 --- CHANGELOG.md | 5 +++ src/rdkafka_topic.c | 79 +++++++++++++++++++++++++-------------------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 419ae576a5..c2c58a0fdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ librdkafka v2.3.1 is a maintenance release: * Fix for a wrong error returned on full metadata refresh before joining a consumer group (#4678). * Fix to metadata refresh interruption (#4679). + * Fix for an undesired partition migration with stale leader epoch (#4660). ## Fixes @@ -44,6 +45,10 @@ librdkafka v2.3.1 is a maintenance release: * Metadata refreshes without partition leader change could lead to a loop of metadata calls at fixed intervals. Solved by stopping metadata refresh when all existing metadata is non-stale. Happening since 2.3.0 (#4679). + * A partition migration could happen, using stale metadata, when the partition + was undergoing a validation and being retried because of an error. + Solved by doing a partition migration only with a non-stale leader epoch. + Happening since 2.1.0 (#4660). diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index f9082fd626..bd1239d501 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -662,8 +662,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t *leader, int32_t leader_epoch) { rd_kafka_toppar_t *rktp; - rd_bool_t fetching_from_follower, need_epoch_validation = rd_false; - int r = 0; + rd_bool_t need_epoch_validation = rd_false; + int r = 0; rktp = rd_kafka_toppar_get(rkt, partition, 0); if (unlikely(!rktp)) { @@ -691,14 +691,17 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, leader_epoch, rktp->rktp_leader_epoch); - if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) { + if (rktp->rktp_fetch_state != + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); /* from get() */ return 0; } } - if (leader_epoch > rktp->rktp_leader_epoch) { + if (rktp->rktp_leader_epoch == -1 || + leader_epoch > rktp->rktp_leader_epoch) { + rd_bool_t fetching_from_follower; rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -706,44 +709,50 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, rktp->rktp_leader_id, rktp->rktp_leader_epoch, leader_id, leader_epoch); - rktp->rktp_leader_epoch = leader_epoch; - need_epoch_validation = rd_true; - } else if (rktp->rktp_fetch_state == - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + if (leader_epoch > rktp->rktp_leader_epoch) + rktp->rktp_leader_epoch = leader_epoch; need_epoch_validation = rd_true; - fetching_from_follower = - leader != NULL && rktp->rktp_broker != NULL && - rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && - rktp->rktp_broker != leader; - if (fetching_from_follower && rktp->rktp_leader_id == leader_id) { - rd_kafka_dbg( - rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", - "Topic %s [%" PRId32 "]: leader %" PRId32 - " unchanged, " - "not migrating away from preferred replica %" PRId32, - rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, - leader_id, rktp->rktp_broker_id); - r = 0; + fetching_from_follower = + leader != NULL && rktp->rktp_broker != NULL && + rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && + rktp->rktp_broker != leader; - } else { + if (fetching_from_follower && + rktp->rktp_leader_id == leader_id) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 "]: leader %" PRId32 + " unchanged, " + "not migrating away from preferred " + "replica %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, leader_id, + rktp->rktp_broker_id); + r = 0; + + } else { - if (rktp->rktp_leader_id != leader_id || - rktp->rktp_leader != leader) { - /* Update leader if it has changed */ - rktp->rktp_leader_id = leader_id; - if (rktp->rktp_leader) - rd_kafka_broker_destroy(rktp->rktp_leader); - if (leader) - rd_kafka_broker_keep(leader); - rktp->rktp_leader = leader; + if (rktp->rktp_leader_id != leader_id || + rktp->rktp_leader != leader) { + /* Update leader if it has changed */ + rktp->rktp_leader_id = leader_id; + if (rktp->rktp_leader) + rd_kafka_broker_destroy( + rktp->rktp_leader); + if (leader) + rd_kafka_broker_keep(leader); + rktp->rktp_leader = leader; + } + + /* Update handling broker */ + r = rd_kafka_toppar_broker_update( + rktp, leader_id, leader, "leader updated"); } - /* Update handling broker */ - r = rd_kafka_toppar_broker_update(rktp, leader_id, leader, - "leader updated"); - } + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; if (need_epoch_validation) { /* Set offset validation position,