From 69956f4dc4bc8f146a837a5ca859b6743893b1d2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 8 Apr 2024 20:44:41 +0200 Subject: [PATCH] Failing test --- src/rdkafka_mock.c | 72 +++++++++++++++++++++++++++++++++ src/rdkafka_mock.h | 18 +++++++++ src/rdkafka_mock_handlers.c | 51 ++++++++++++++++++------ src/rdkafka_mock_int.h | 21 ++++++++++ src/rdkafka_op.h | 12 +++++- tests/0146-metadata_mock.c | 79 +++++++++++++++++++++++++++++++++++++ 6 files changed, 240 insertions(+), 13 deletions(-) diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index d675beae04..a473f0915d 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -469,7 +469,39 @@ rd_kafka_mock_partition_assign_replicas(rd_kafka_mock_partition_t *mpart, mpart, mpart->replicas[rd_jitter(0, replica_cnt - 1)]); } +/** + * @brief Push a partition leader response to passed \p mpart . + */ +static void +rd_kafka_mock_partition_push_leader_response0(rd_kafka_mock_partition_t *mpart, + int32_t leader_id, + int32_t leader_epoch) { + rd_kafka_mock_partition_leader_t *leader_response; + + leader_response = rd_calloc(1, sizeof(*leader_response)); + leader_response->leader_id = leader_id; + leader_response->leader_epoch = leader_epoch; + TAILQ_INSERT_TAIL(&mpart->leader_responses, leader_response, link); +} +/** + * @brief Return the first mocked partition leader response in \p mpart , + * if available. + */ +rd_kafka_mock_partition_leader_t * +rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart) { + return TAILQ_FIRST(&mpart->leader_responses); +} + +/** + * @brief Unlink and destroy a partition leader response + */ +void rd_kafka_mock_partition_leader_destroy( + rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_partition_leader_t *mpart_leader) { + TAILQ_REMOVE(&mpart->leader_responses, mpart_leader, link); + rd_free(mpart_leader); +} /** * @brief Unlink and destroy committed offset @@ -546,6 +578,7 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) { rd_kafka_mock_msgset_t *mset, *tmp; rd_kafka_mock_committed_offset_t *coff, *tmpcoff; + rd_kafka_mock_partition_leader_t *mpart_leader, *tmp_mpart_leader; TAILQ_FOREACH_SAFE(mset, &mpart->msgsets, link, tmp) rd_kafka_mock_msgset_destroy(mpart, mset); @@ -553,6 +586,10 @@ static void rd_kafka_mock_partition_destroy(rd_kafka_mock_partition_t *mpart) { TAILQ_FOREACH_SAFE(coff, &mpart->committed_offsets, link, tmpcoff) rd_kafka_mock_committed_offset_destroy(mpart, coff); + TAILQ_FOREACH_SAFE(mpart_leader, &mpart->leader_responses, link, + tmp_mpart_leader) + rd_kafka_mock_partition_leader_destroy(mpart, mpart_leader); + rd_list_destroy(&mpart->pidstates); rd_free(mpart->replicas); @@ -579,6 +616,7 @@ static void rd_kafka_mock_partition_init(rd_kafka_mock_topic_t *mtopic, mpart->update_follower_end_offset = rd_true; TAILQ_INIT(&mpart->committed_offsets); + TAILQ_INIT(&mpart->leader_responses); rd_list_init(&mpart->pidstates, 0, rd_free); @@ -2096,6 +2134,23 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster, rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); } +rd_kafka_resp_err_t +rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition, + int32_t leader_id, + int32_t leader_epoch) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE; + rko->rko_u.mock.partition = partition; + rko->rko_u.mock.leader_id = leader_id; + rko->rko_u.mock.leader_epoch = leader_epoch; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + rd_kafka_resp_err_t rd_kafka_mock_broker_set_down(rd_kafka_mock_cluster_t *mcluster, int32_t broker_id) { @@ -2379,6 +2434,23 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, mpart->update_follower_end_offset = rd_false; } break; + case RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE: + mpart = rd_kafka_mock_partition_get( + mcluster, rko->rko_u.mock.name, rko->rko_u.mock.partition); + if (!mpart) + return RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Push %s [%" PRId32 "] leader response: (%" PRId32 + ", %" PRId32 ")", + rko->rko_u.mock.name, rko->rko_u.mock.partition, + rko->rko_u.mock.leader_id, + rko->rko_u.mock.leader_epoch); + + rd_kafka_mock_partition_push_leader_response0( + mpart, rko->rko_u.mock.leader_id, + rko->rko_u.mock.leader_epoch); + break; /* Broker commands */ case RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN: diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 231701fb1c..737b768339 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -282,6 +282,24 @@ rd_kafka_mock_partition_set_follower_wmarks(rd_kafka_mock_cluster_t *mcluster, int64_t lo, int64_t hi); +/** + * @brief Push \p cnt Metadata leader response + * onto the cluster's stack for the given \p topic and \p partition. + * + * @param topic Topic to change + * @param partition Partition to change in \p topic + * @param leader_id Broker id of the leader node + * @param leader_epoch Leader epoch corresponding to the given \p leader_id + * + * @return Push operation error code + */ +RD_EXPORT +rd_kafka_resp_err_t +rd_kafka_mock_partition_push_leader_response(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + int partition, + int32_t leader_id, + int32_t leader_epoch); /** * @brief Disconnects the broker and disallows any new connections. diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 4d338bab6d..9fd5667ce6 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -857,7 +857,8 @@ static int rd_kafka_mock_handle_ApiVersion(rd_kafka_mock_connection_t *mconn, * @param mtopic may be NULL */ static void -rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp, +rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_mock_cluster_t *mcluster, + rd_kafka_buf_t *resp, int16_t ApiVersion, const char *topic, const rd_kafka_mock_topic_t *mtopic, @@ -880,20 +881,46 @@ rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_buf_t *resp, rd_kafka_buf_write_arraycnt(resp, partition_cnt); for (i = 0; mtopic && i < partition_cnt; i++) { - const rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i]; + rd_kafka_mock_partition_leader_t *mpart_leader; + rd_kafka_mock_partition_t *mpart = &mtopic->partitions[i]; int r; /* Response: ..Partitions.ErrorCode */ rd_kafka_buf_write_i16(resp, 0); /* Response: ..Partitions.PartitionIndex */ rd_kafka_buf_write_i32(resp, mpart->id); - /* Response: ..Partitions.Leader */ - rd_kafka_buf_write_i32(resp, - mpart->leader ? mpart->leader->id : -1); - if (ApiVersion >= 7) { - /* Response: ..Partitions.LeaderEpoch */ - rd_kafka_buf_write_i32(resp, mpart->leader_epoch); + mpart_leader = + rd_kafka_mock_partition_next_leader_response(mpart); + if (mpart_leader) { + rd_kafka_dbg( + mcluster->rk, MOCK, "MOCK", + "MetadataRequest: using next leader response " + "(%" PRId32 ", %" PRId32 ")", + mpart_leader->leader_id, + mpart_leader->leader_epoch); + + /* Response: ..Partitions.Leader */ + rd_kafka_buf_write_i32(resp, mpart_leader->leader_id); + + if (ApiVersion >= 7) { + /* Response: ..Partitions.LeaderEpoch */ + rd_kafka_buf_write_i32( + resp, mpart_leader->leader_epoch); + } + rd_kafka_mock_partition_leader_destroy(mpart, + mpart_leader); + mpart_leader = NULL; + } else { + /* Response: ..Partitions.Leader */ + rd_kafka_buf_write_i32( + resp, mpart->leader ? mpart->leader->id : -1); + + if (ApiVersion >= 7) { + /* Response: ..Partitions.LeaderEpoch */ + rd_kafka_buf_write_i32(resp, + mpart->leader_epoch); + } } /* Response: ..Partitions.#ReplicaNodes */ @@ -1010,8 +1037,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn, TAILQ_FOREACH(mtopic, &mcluster->topics, link) { rd_kafka_mock_buf_write_Metadata_Topic( - resp, rkbuf->rkbuf_reqhdr.ApiVersion, mtopic->name, - mtopic, mtopic->err); + mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion, + mtopic->name, mtopic, mtopic->err); } } else if (requested_topics) { @@ -1033,8 +1060,8 @@ static int rd_kafka_mock_handle_Metadata(rd_kafka_mock_connection_t *mconn, err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; rd_kafka_mock_buf_write_Metadata_Topic( - resp, rkbuf->rkbuf_reqhdr.ApiVersion, rktpar->topic, - mtopic, err ? err : mtopic->err); + mcluster, resp, rkbuf->rkbuf_reqhdr.ApiVersion, + rktpar->topic, mtopic, err ? err : mtopic->err); } } else { diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 8d781e5efb..ea91363110 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -235,6 +235,16 @@ typedef struct rd_kafka_mock_committed_offset_s { rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */ } rd_kafka_mock_committed_offset_t; +/** + * @struct Leader id and epoch to return in a Metadata call. + */ +typedef struct rd_kafka_mock_partition_leader_s { + /**< Link to prev/next entries */ + TAILQ_ENTRY(rd_kafka_mock_partition_leader_s) link; + int32_t leader_id; /**< Leader id */ + int32_t leader_epoch; /**< Leader epoch */ +} rd_kafka_mock_partition_leader_t; + TAILQ_HEAD(rd_kafka_mock_msgset_tailq_s, rd_kafka_mock_msgset_s); @@ -276,6 +286,10 @@ typedef struct rd_kafka_mock_partition_s { int32_t follower_id; /**< Preferred replica/follower */ struct rd_kafka_mock_topic_s *topic; + + /**< Leader responses */ + TAILQ_HEAD(, rd_kafka_mock_partition_leader_s) + leader_responses; } rd_kafka_mock_partition_t; @@ -477,6 +491,13 @@ int64_t rd_kafka_mock_partition_offset_for_leader_epoch( const rd_kafka_mock_partition_t *mpart, int32_t leader_epoch); +rd_kafka_mock_partition_leader_t * +rd_kafka_mock_partition_next_leader_response(rd_kafka_mock_partition_t *mpart); + +void rd_kafka_mock_partition_leader_destroy( + rd_kafka_mock_partition_t *mpart, + rd_kafka_mock_partition_leader_t *mpart_leader); + /** * @returns true if the ApiVersion is supported, else false. diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 3a1384362a..a9d1a6e969 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -564,6 +564,7 @@ struct rd_kafka_op_s { RD_KAFKA_MOCK_CMD_PART_SET_LEADER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER, RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS, + RD_KAFKA_MOCK_CMD_PART_PUSH_LEADER_RESPONSE, RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN, RD_KAFKA_MOCK_CMD_BROKER_SET_RTT, RD_KAFKA_MOCK_CMD_BROKER_SET_RACK, @@ -579,7 +580,9 @@ struct rd_kafka_op_s { * PART_SET_FOLLOWER * PART_SET_FOLLOWER_WMARKS * BROKER_SET_RACK - * COORD_SET (key_type) */ + * COORD_SET (key_type) + * PART_PUSH_LEADER_RESPONSE + */ char *str; /**< For: * COORD_SET (key) */ int32_t partition; /**< For: @@ -587,6 +590,7 @@ struct rd_kafka_op_s { * PART_SET_FOLLOWER_WMARKS * PART_SET_LEADER * APIVERSION_SET (ApiKey) + * PART_PUSH_LEADER_RESPONSE */ int32_t broker_id; /**< For: * PART_SET_FOLLOWER @@ -606,6 +610,12 @@ struct rd_kafka_op_s { * PART_SET_FOLLOWER_WMARKS * APIVERSION_SET (maxver) */ + int32_t leader_id; /**< Leader id, for: + * PART_PUSH_LEADER_RESPONSE + */ + int32_t leader_epoch; /**< Leader epoch, for: + * PART_PUSH_LEADER_RESPONSE + */ } mock; struct { diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c index 258dc43a93..5467dcd486 100644 --- a/tests/0146-metadata_mock.c +++ b/tests/0146-metadata_mock.c @@ -35,6 +35,11 @@ static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request, return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Metadata; } +static rd_bool_t is_fetch_request(rd_kafka_mock_request_t *request, + void *opaque) { + return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; +} + /** * @brief Metadata should persists in cache after * a full metadata refresh. @@ -141,6 +146,77 @@ static void do_test_fast_metadata_refresh_stops(void) { SUB_TEST_PASS(); } + +/** + * @brief A stale leader received while validating shouldn't + * migrate back the partition to that stale broker. + */ +static void do_test_stale_metadata_doesnt_migrate_partition(void) { + int i, fetch_requests; + rd_kafka_t *rk; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_conf_t *conf; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 3); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + test_conf_init(&conf, NULL, 10); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "group.id", topic); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "fetch.error.backoff.ms", "10"); + test_conf_set(conf, "fetch.wait.max.ms", "10"); + + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + test_consumer_subscribe(rk, topic); + + /* Produce and consume to leader 1 */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 0, "bootstrap.servers", + bootstraps, NULL); + test_consumer_poll_exact("read first", rk, 0, 0, 0, 1, rd_true, NULL); + + /* Change leader to 2, Fetch fails, refreshes metadata. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + for (i = 0; i < 5; i++) { + /* Validation fails, metadata refreshed again */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); + } + + /* Wait partition migrates to broker 2 */ + rd_usleep(100 * 1000, 0); + + /* Return stale metadata */ + for (i = 0; i < 10; i++) { + rd_kafka_mock_partition_push_leader_response( + mcluster, topic, 0, 1 /*leader id*/, 0 /*leader epoch*/); + } + + /* Partition doesn't have to migrate back to broker 1 */ + rd_usleep(2000 * 1000, 0); + rd_kafka_mock_start_request_tracking(mcluster); + fetch_requests = test_mock_wait_maching_requests( + mcluster, 0, 500, is_fetch_request, NULL); + TEST_ASSERT(fetch_requests == 0, + "No fetch request should be received by broker 1, got %d", + fetch_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + /** * @brief A metadata call for an existing topic, just after subscription, * must not cause a UNKNOWN_TOPIC_OR_PART error. @@ -186,6 +262,9 @@ int main_0146_metadata_mock(int argc, char **argv) { do_test_metadata_persists_in_cache("cooperative-sticky"); do_test_fast_metadata_refresh_stops(); + + do_test_stale_metadata_doesnt_migrate_partition(); + do_test_metadata_call_before_join(); return 0;