Skip to content

Commit

Permalink
Fix for an undesired partition migration
Browse files Browse the repository at this point in the history
with stale leader epoch

A partition migration could happen,
using stale metadata, when the partition
was undergoing a validation and
being retried because of an error.
Solved by doing a partition migration
only with a non-stale leader epoch.
Happening since 2.1.0
  • Loading branch information
emasab committed Apr 14, 2024
1 parent 69956f4 commit fd0238d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 35 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ librdkafka v2.3.1 is a maintenance release:
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
* Fix to metadata refresh interruption (#4679).
* Fix for an undesired partition migration with stale leader epoch (#4680).


## Fixes
Expand Down Expand Up @@ -46,6 +47,10 @@ librdkafka v2.3.1 is a maintenance release:
* Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4679).
* A partition migration could happen, using stale metadata, when the partition
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0 (#4680).

### Consumer fixes

Expand Down
79 changes: 44 additions & 35 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
rd_kafka_broker_t *leader,
int32_t leader_epoch) {
rd_kafka_toppar_t *rktp;
rd_bool_t fetching_from_follower, need_epoch_validation = rd_false;
int r = 0;
rd_bool_t need_epoch_validation = rd_false;
int r = 0;

rktp = rd_kafka_toppar_get(rkt, partition, 0);
if (unlikely(!rktp)) {
Expand Down Expand Up @@ -691,59 +691,68 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, leader_epoch,
rktp->rktp_leader_epoch);
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
if (rktp->rktp_fetch_state !=
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) {
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(rktp); /* from get() */
return 0;
}
}

if (leader_epoch > rktp->rktp_leader_epoch) {
if (rktp->rktp_leader_epoch == -1 ||
leader_epoch > rktp->rktp_leader_epoch) {
rd_bool_t fetching_from_follower;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32 "]: leader %" PRId32
" epoch %" PRId32 " -> leader %" PRId32
" epoch %" PRId32,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rktp->rktp_leader_id,
rktp->rktp_leader_epoch, leader_id, leader_epoch);
rktp->rktp_leader_epoch = leader_epoch;
need_epoch_validation = rd_true;
} else if (rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
if (leader_epoch > rktp->rktp_leader_epoch)
rktp->rktp_leader_epoch = leader_epoch;
need_epoch_validation = rd_true;

fetching_from_follower =
leader != NULL && rktp->rktp_broker != NULL &&
rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL &&
rktp->rktp_broker != leader;

if (fetching_from_follower && rktp->rktp_leader_id == leader_id) {
rd_kafka_dbg(
rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"Topic %s [%" PRId32 "]: leader %" PRId32
" unchanged, "
"not migrating away from preferred replica %" PRId32,
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
leader_id, rktp->rktp_broker_id);
r = 0;
fetching_from_follower =
leader != NULL && rktp->rktp_broker != NULL &&
rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL &&
rktp->rktp_broker != leader;

} else {
if (fetching_from_follower &&
rktp->rktp_leader_id == leader_id) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"Topic %s [%" PRId32 "]: leader %" PRId32
" unchanged, "
"not migrating away from preferred "
"replica %" PRId32,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, leader_id,
rktp->rktp_broker_id);
r = 0;

} else {

if (rktp->rktp_leader_id != leader_id ||
rktp->rktp_leader != leader) {
/* Update leader if it has changed */
rktp->rktp_leader_id = leader_id;
if (rktp->rktp_leader)
rd_kafka_broker_destroy(rktp->rktp_leader);
if (leader)
rd_kafka_broker_keep(leader);
rktp->rktp_leader = leader;
if (rktp->rktp_leader_id != leader_id ||
rktp->rktp_leader != leader) {
/* Update leader if it has changed */
rktp->rktp_leader_id = leader_id;
if (rktp->rktp_leader)
rd_kafka_broker_destroy(
rktp->rktp_leader);
if (leader)
rd_kafka_broker_keep(leader);
rktp->rktp_leader = leader;
}

/* Update handling broker */
r = rd_kafka_toppar_broker_update(
rktp, leader_id, leader, "leader updated");
}

/* Update handling broker */
r = rd_kafka_toppar_broker_update(rktp, leader_id, leader,
"leader updated");
}
} else if (rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;

if (need_epoch_validation) {
/* Set offset validation position,
Expand Down

0 comments on commit fd0238d

Please sign in to comment.