diff --git a/CHANGELOG.md b/CHANGELOG.md index 95c740432a..03f7f1ba03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ librdkafka v2.4.0 is a feature release: * Fix for a wrong error returned on full metadata refresh before joining a consumer group (#4678). * Fix to metadata refresh interruption (#4679). + * Fix for an undesired partition migration with stale leader epoch (#4680). ## Upgrade considerations @@ -63,6 +64,11 @@ librdkafka v2.4.0 is a feature release: Metadata refreshes without partition leader change could lead to a loop of metadata calls at fixed intervals. Solved by stopping metadata refresh when all existing metadata is non-stale. Happening since 2.3.0 (#4679). + * Issues: #4687. + A partition migration could happen, using stale metadata, when the partition + was undergoing a validation and being retried because of an error. + Solved by doing a partition migration only with a non-stale leader epoch. + Happening since 2.1.0 (#4680). ### Consumer fixes diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index d675beae04..a473f0915d 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -469,7 +469,39 @@ rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart, mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]); } +/** + * @brief Push a partition leader response to passed \p mpart . + */ +static void +rd_kafka_mock_partition_push_leader_response0(rd_kafka_mock_partition_t *mpart, + int32_t leader_id, + int32_t leader_epoch) { + rd_kafka_mock_partition_leader_t *leader_response; + + leader_response = rd_calloc(1, sizeof(*leader_response)); + leader_response->leader_id = leader_id; + leader_response->leader_epoch = leader_epoch; + TAILQ_INSERT_TAIL(&mpart->leader_responses, leader_response, link); +} +/** + * @brief Return the first mocked partition leader response in \p mpart , + * if available. + */ +rd_kafka_mock_partition_leader_t * +rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart) { + return TAILQ_FIRST(&mpart->leader_responses); +} + +/** + * @brief Unlink and destroy a partition leader response + */ +void rd_kafka_mock_partition_leader_destroy( + rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_partition_leader_t *mpart_leader) { + TAILQ_REMOVE(&mpart->leader_responses, mpart_leader, link); + rd_free(mpart_leader); +} /** * @brief Unlink and destroy committed offset @@ -546,6 +578,7 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) { rd_kafka_mock_msgset_t *mset, *tmp; rd_kafka_mock_committed_offset_t *coff, *tmpcoff; + rd_kafka_mock_partition_leader_t *mpart_leader, *tmp_mpart_leader; TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp) rd_kafka_mock_msgset_destroy(mpart, mset); @@ -553,6 +586,10 @@ static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) { TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff) rd_kafka_mock_committed_offset_destroy(mpart, coff); + TAILQ_FOREACH_SAFE(mpart_leader, &mpart->leader_responses, link, + tmp_mpart_leader) + rd_kafka_mock_partition_leader_destroy(mpart, mpart_leader); + rd_list_destroy(&mpart->pidstates); rd_free(mpart->replicas); @@ -579,6 +616,7 @@ static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic, mpart->update_follower_end_offset = rd_true; TAILQ_INIT(&mpart->committed_offsets); + TAILQ_INIT(&mpart->leader_responses); rd_list_init(&mpart->pidstates, 0, rd_free); @@ -2096,6 +2134,23 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster, rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); } +rd_kafka_resp_err_t +rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition, + int32_t leader_id, + int32_t leader_epoch) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE; + rko->rko_u.mock.partition = partition; + rko->rko_u.mock.leader_id = leader_id; + rko->rko_u.mock.leader_epoch = leader_epoch; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + rd_kafka_resp_err_t rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id) { @@ -2379,6 +2434,23 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, mpart->update_follower_end_offset = rd_false; } break; + case RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE: + mpart = rd_kafka_mock_partition_get( + mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition); + if (!mpart) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Push %s [%" PRId32 "] leader response: (%" PRId32 + ", %" PRId32 ")", + rko->rko_u.mock.name, rko->rko_u.mock.partition, + rko->rko_u.mock.leader_id, + rko->rko_u.mock.leader_epoch); + + rd_kafka_mock_partition_push_leader_response0( + mpart, rko->rko_u.mock.leader_id, + rko->rko_u.mock.leader_epoch); + break; /* Broker commands */ case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN: diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 231701fb1c..737b768339 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -282,6 +282,24 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster, int64_t lo, int64_t hi); +/** + * @brief Push \p cnt Metadata leader response + * onto the cluster's stack for the given \p topic and \p partition. + * + * @param topic Topic to change + * @param partition Partition to change in \p topic + * @param leader_id Broker id of the leader node + * @param leader_epoch Leader epoch corresponding to the given \p leader_id + * + * @return Push operation error code + */ +RD_EXPORT +rd_kafka_resp_err_t +rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition, + int32_t leader_id, + int32_t leader_epoch); /** * @brief Disconnects the broker and disallows any new connections. diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 4d338bab6d..9fd5667ce6 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -857,7 +857,8 @@ static int rd_kafka_mock_handle_ApiVersion(rd_kafka_mock_connection_t *mconn, * @param mtopic may be NULL */ static void -rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp, +rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_buf_t *resp, int16_t ApiVersion, const char *topic, const rd_kafka_mock_topic_t *mtopic, @@ -880,20 +881,46 @@ rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp, rd_kafka_buf_write_arraycnt(resp, partition_cnt); for (i = 0; mtopic && i < partition_cnt; i++) { - const rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i]; + rd_kafka_mock_partition_leader_t *mpart_leader; + rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i]; int r; /* Response: ..Partitions.ErrorCode */ rd_kafka_buf_write_i16(resp, 0); /* Response: ..Partitions.PartitionIndex */ rd_kafka_buf_write_i32(resp, mpart->id); - /* Response: ..Partitions.Leader */ - rd_kafka_buf_write_i32(resp, - mpart->leader ? mpart->leader->id : -1); - if (ApiVersion >= 7) { - /* Response: ..Partitions.LeaderEpoch */ - rd_kafka_buf_write_i32(resp, mpart->leader_epoch); + mpart_leader = + rd_kafka_mock_partition_next_leader_response(mpart); + if (mpart_leader) { + rd_kafka_dbg( + mcluster->rk, MOCK, "MOCK", + "MetadataRequest: using next leader response " + "(%" PRId32 ", %" PRId32 ")", + mpart_leader->leader_id, + mpart_leader->leader_epoch); + + /* Response: ..Partitions.Leader */ + rd_kafka_buf_write_i32(resp, mpart_leader->leader_id); + + if (ApiVersion >= 7) { + /* Response: ..Partitions.LeaderEpoch */ + rd_kafka_buf_write_i32( + resp, mpart_leader->leader_epoch); + } + rd_kafka_mock_partition_leader_destroy(mpart, + mpart_leader); + mpart_leader = NULL; + } else { + /* Response: ..Partitions.Leader */ + rd_kafka_buf_write_i32( + resp, mpart->leader ? mpart->leader->id : -1); + + if (ApiVersion >= 7) { + /* Response: ..Partitions.LeaderEpoch */ + rd_kafka_buf_write_i32(resp, + mpart->leader_epoch); + } } /* Response: ..Partitions.#ReplicaNodes */ @@ -1010,8 +1037,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn, TAILQ_FOREACH(mtopic, &mcluster->topics, link) { rd_kafka_mock_buf_write_Metadata_Topic( - resp, rkbuf->rkbuf_reqhdr.ApiVersion, mtopic->name, - mtopic, mtopic->err); + mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion, + mtopic->name, mtopic, mtopic->err); } } else if (requested_topics) { @@ -1033,8 +1060,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn, err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; rd_kafka_mock_buf_write_Metadata_Topic( - resp, rkbuf->rkbuf_reqhdr.ApiVersion, rktpar->topic, - mtopic, err ? err : mtopic->err); + mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion, + rktpar->topic, mtopic, err ? err : mtopic->err); } } else { diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 8d781e5efb..ea91363110 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -235,6 +235,16 @@ typedef struct rd_kafka_mock_committed_offset_s { rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */ } rd_kafka_mock_committed_offset_t; +/** + * @struct Leader id and epoch to return in a Metadata call. + */ +typedef struct rd_kafka_mock_partition_leader_s { + /**< Link to prev/next entries */ + TAILQ_ENTRY(rd_kafka_mock_partition_leader_s) link; + int32_t leader_id; /**< Leader id */ + int32_t leader_epoch; /**< Leader epoch */ +} rd_kafka_mock_partition_leader_t; + TAILQ_HEAD(rd_kafka_mock_msgset_tailq_s, rd_kafka_mock_msgset_s); @@ -276,6 +286,10 @@ typedef struct rd_kafka_mock_partition_s { int32_t follower_id; /**< Preferred replica/follower */ struct rd_kafka_mock_topic_s *topic; + + /**< Leader responses */ + TAILQ_HEAD(, rd_kafka_mock_partition_leader_s) + leader_responses; } rd_kafka_mock_partition_t; @@ -477,6 +491,13 @@ int64_t rd_kafka_mock_partition_offset_for_leader_epoch( const rd_kafka_mock_partition_t *mpart, int32_t leader_epoch); +rd_kafka_mock_partition_leader_t * +rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart); + +void rd_kafka_mock_partition_leader_destroy( + rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_partition_leader_t *mpart_leader); + /** * @returns true if the ApiVersion is supported, else false. diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 84b0172f5d..8337586d58 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -565,6 +565,7 @@ struct rd_kafka_op_s { RD_KAFKA_MOCK_CMD_PART_SET_LEADER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS, + RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE, RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN, RD_KAFKA_MOCK_CMD_BROKER_SET_RTT, RD_KAFKA_MOCK_CMD_BROKER_SET_RACK, @@ -580,7 +581,9 @@ struct rd_kafka_op_s { * PART_SET_FOLLOWER * PART_SET_FOLLOWER_WMARKS * BROKER_SET_RACK - * COORD_SET (key_type) */ + * COORD_SET (key_type) + * PART_PUSH_LEADER_RESPONSE + */ char *str; /**< For: * COORD_SET (key) */ int32_t partition; /**< For: @@ -588,6 +591,7 @@ struct rd_kafka_op_s { * PART_SET_FOLLOWER_WMARKS * PART_SET_LEADER * APIVERSION_SET (ApiKey) + * PART_PUSH_LEADER_RESPONSE */ int32_t broker_id; /**< For: * PART_SET_FOLLOWER @@ -607,6 +611,12 @@ struct rd_kafka_op_s { * PART_SET_FOLLOWER_WMARKS * APIVERSION_SET (maxver) */ + int32_t leader_id; /**< Leader id, for: + * PART_PUSH_LEADER_RESPONSE + */ + int32_t leader_epoch; /**< Leader epoch, for: + * PART_PUSH_LEADER_RESPONSE + */ } mock; struct { diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index f9082fd626..bd1239d501 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -662,8 +662,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rd_kafka_broker_t *leader, int32_t leader_epoch) { rd_kafka_toppar_t *rktp; - rd_bool_t fetching_from_follower, need_epoch_validation = rd_false; - int r = 0; + rd_bool_t need_epoch_validation = rd_false; + int r = 0; rktp = rd_kafka_toppar_get(rkt, partition, 0); if (unlikely(!rktp)) { @@ -691,14 +691,17 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, leader_epoch, rktp->rktp_leader_epoch); - if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE) { + if (rktp->rktp_fetch_state != + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); /* from get() */ return 0; } } - if (leader_epoch > rktp->rktp_leader_epoch) { + if (rktp->rktp_leader_epoch == -1 || + leader_epoch > rktp->rktp_leader_epoch) { + rd_bool_t fetching_from_follower; rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -706,44 +709,50 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, rktp->rktp_leader_id, rktp->rktp_leader_epoch, leader_id, leader_epoch); - rktp->rktp_leader_epoch = leader_epoch; - need_epoch_validation = rd_true; - } else if (rktp->rktp_fetch_state == - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + if (leader_epoch > rktp->rktp_leader_epoch) + rktp->rktp_leader_epoch = leader_epoch; need_epoch_validation = rd_true; - fetching_from_follower = - leader != NULL && rktp->rktp_broker != NULL && - rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && - rktp->rktp_broker != leader; - if (fetching_from_follower && rktp->rktp_leader_id == leader_id) { - rd_kafka_dbg( - rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", - "Topic %s [%" PRId32 "]: leader %" PRId32 - " unchanged, " - "not migrating away from preferred replica %" PRId32, - rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, - leader_id, rktp->rktp_broker_id); - r = 0; + fetching_from_follower = + leader != NULL && rktp->rktp_broker != NULL && + rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && + rktp->rktp_broker != leader; - } else { + if (fetching_from_follower && + rktp->rktp_leader_id == leader_id) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 "]: leader %" PRId32 + " unchanged, " + "not migrating away from preferred " + "replica %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, leader_id, + rktp->rktp_broker_id); + r = 0; + + } else { - if (rktp->rktp_leader_id != leader_id || - rktp->rktp_leader != leader) { - /* Update leader if it has changed */ - rktp->rktp_leader_id = leader_id; - if (rktp->rktp_leader) - rd_kafka_broker_destroy(rktp->rktp_leader); - if (leader) - rd_kafka_broker_keep(leader); - rktp->rktp_leader = leader; + if (rktp->rktp_leader_id != leader_id || + rktp->rktp_leader != leader) { + /* Update leader if it has changed */ + rktp->rktp_leader_id = leader_id; + if (rktp->rktp_leader) + rd_kafka_broker_destroy( + rktp->rktp_leader); + if (leader) + rd_kafka_broker_keep(leader); + rktp->rktp_leader = leader; + } + + /* Update handling broker */ + r = rd_kafka_toppar_broker_update( + rktp, leader_id, leader, "leader updated"); } - /* Update handling broker */ - r = rd_kafka_toppar_broker_update(rktp, leader_id, leader, - "leader updated"); - } + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; if (need_epoch_validation) { /* Set offset validation position, diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c index 818ee753b0..95e03de8b3 100644 --- a/tests/0146-metadata_mock.c +++ b/tests/0146-metadata_mock.c @@ -35,6 +35,11 @@ static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request, return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Metadata; } +static rd_bool_t is_fetch_request(rd_kafka_mock_request_t *request, + void *opaque) { + return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; +} + /** * @brief Metadata should persists in cache after * a full metadata refresh. @@ -141,6 +146,76 @@ static void do_test_fast_metadata_refresh_stops(void) { SUB_TEST_PASS(); } +/** + * @brief A stale leader received while validating shouldn't + * migrate back the partition to that stale broker. + */ +static void do_test_stale_metadata_doesnt_migrate_partition(void) { + int i, fetch_requests; + rd_kafka_t *rk; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_conf_t *conf; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 3); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + test_conf_init(&conf, NULL, 10); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "group.id", topic); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "fetch.error.backoff.ms", "10"); + test_conf_set(conf, "fetch.wait.max.ms", "10"); + + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + test_consumer_subscribe(rk, topic); + + /* Produce and consume to leader 1 */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 0, "bootstrap.servers", + bootstraps, NULL); + test_consumer_poll_exact("read first", rk, 0, 0, 0, 1, rd_true, NULL); + + /* Change leader to 2, Fetch fails, refreshes metadata. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + for (i = 0; i < 5; i++) { + /* Validation fails, metadata refreshed again */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); + } + + /* Wait partition migrates to broker 2 */ + rd_usleep(100 * 1000, 0); + + /* Return stale metadata */ + for (i = 0; i < 10; i++) { + rd_kafka_mock_partition_push_leader_response( + mcluster, topic, 0, 1 /*leader id*/, 0 /*leader epoch*/); + } + + /* Partition doesn't have to migrate back to broker 1 */ + rd_usleep(2000 * 1000, 0); + rd_kafka_mock_start_request_tracking(mcluster); + fetch_requests = test_mock_wait_matching_requests( + mcluster, 0, 500, is_fetch_request, NULL); + TEST_ASSERT(fetch_requests == 0, + "No fetch request should be received by broker 1, got %d", + fetch_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + /** * @brief A metadata call for an existing topic, just after subscription, * must not cause a UNKNOWN_TOPIC_OR_PART error. @@ -191,5 +266,7 @@ int main_0146_metadata_mock(int argc, char **argv) { do_test_fast_metadata_refresh_stops(); + do_test_stale_metadata_doesnt_migrate_partition(); + return 0; }