From 3e1f067569d7ddc1e27b78f770c6122bfe372ad7 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 18 Jun 2024 13:15:31 +0530 Subject: [PATCH 1/5] Add test and mock files --- src/rdkafka_mock_handlers.c | 133 ++++++++++++-- tests/0147-metadata_update_op_mock.c | 257 +++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/test.c | 3 +- win32/tests/tests.vcxproj | 1 + 5 files changed, 383 insertions(+), 12 deletions(-) create mode 100644 tests/0147-metadata_update_op_mock.c diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index d67cc6e60f..11773a75a5 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -55,6 +55,7 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, int16_t Acks; int32_t TimeoutMs; rd_kafka_resp_err_t all_err; + rd_kafka_mock_broker_t *leader = NULL; if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) rd_kafka_buf_read_str(rkbuf, &TransactionalId); @@ -111,9 +112,11 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, err = all_err; else if (!mpart) err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - else if (mpart->leader != mconn->broker) + else if (mpart->leader != mconn->broker) { err = RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION; + leader = mpart->leader; + } /* Append to partition log */ if (!err) @@ -162,7 +165,31 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_str(resp, NULL, 0); } /* Response: Partition tags */ - rd_kafka_buf_write_tags_empty(resp); + /* Partition tags count */ + rd_kafka_buf_write_uvarint( + resp, + rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && + err == + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION + ? 1 + : 0); + + /* Partition tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && + err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) { + /* Tag type */ + rd_kafka_buf_write_uvarint(resp, 0); + /* Tag len = 4 (leader_id) + 4 + * (leader_epoch) + 1 (tags) */ + rd_kafka_buf_write_uvarint(resp, 9); + /* Leader id */ + rd_kafka_buf_write_i32(resp, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32(resp, + mpart->leader_epoch); + /* Remaining tags */ + rd_kafka_buf_write_tags_empty(resp); + } } /* Topic tags */ @@ -177,8 +204,37 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, } /* Response: Top level tags */ - rd_kafka_buf_write_tags_empty(resp); - + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 9) { + /* Tag count */ + rd_kafka_buf_write_uvarint( + resp, + rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader ? 1 : 0); + + /* NodeEndpoint tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader) { + /* Tag type */ + rd_kafka_buf_write_uvarint(resp, 0); + /* Tag Len */ + rd_kafka_buf_write_uvarint( + resp, 4 + strlen(leader->advertised_listener) + 2 + + 4 + 2); + /* NodeEndpoints array count */ + rd_kafka_buf_write_arraycnt(resp, 1); + /* Leader id */ + rd_kafka_buf_write_i32(resp, leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str(resp, + leader->advertised_listener, -1); + /* Leader Port number */ + rd_kafka_buf_write_i32(resp, (int32_t)leader->port); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { + /* Leader Rack */ + rd_kafka_buf_write_str(resp, leader->rack, -1); + } + /* Remaining tags */ + rd_kafka_buf_write_tags_empty(resp); + } + } rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); return 0; @@ -202,7 +258,8 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, int32_t ReplicaId = -1, MaxWait, MinBytes, MaxBytes = -1, SessionId = -1, Epoch, TopicsCnt; int8_t IsolationLevel; - size_t totsize = 0; + size_t totsize = 0; + rd_kafka_mock_broker_t *leader = NULL; if (rkbuf->rkbuf_reqhdr.ApiVersion <= 14) { rd_kafka_buf_read_i32(rkbuf, &ReplicaId); @@ -422,8 +479,38 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_arraycnt(resp, 0); } + /* Partition tags count */ + rd_kafka_buf_write_uvarint( + resp, + rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && + (err == + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION || + err == + RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH) + ? 1 + : 0); + + /* Response: Partition tags */ - rd_kafka_buf_write_tags_empty(resp); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && + (err == + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION || + err == RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH)) { + leader = mpart->leader; + + /* Tag type */ + rd_kafka_buf_write_uvarint(resp, 1); + /* Tag len = 4 (leader_id) + 4 + * (leader_epoch) + 1 (tags) */ + rd_kafka_buf_write_uvarint(resp, 9); + /* Leader id */ + rd_kafka_buf_write_i32(resp, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32(resp, + mpart->leader_epoch); + /* Remaining tags */ + rd_kafka_buf_write_tags_empty(resp); + } } /* Response: Topic tags */ @@ -465,10 +552,34 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, RD_KAFKAP_STR_DUPA(&rack, &RackId); /* Matt might do something sensible with this */ } - + rd_kafka_buf_skip_tags(rkbuf); /* Response: Top level tags */ - rd_kafka_buf_write_tags_empty(resp); - + rd_kafka_buf_write_uvarint( + resp, rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && leader ? 1 : 0); + + + /* Response: Partition tags */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && leader) { + /* Tag type */ + rd_kafka_buf_write_uvarint(resp, 0); + /* Tag Len */ + rd_kafka_buf_write_uvarint( + resp, 4 + strlen(leader->advertised_listener) + 2 + 4 + 2); + /* NodeEndpoints array count */ + rd_kafka_buf_write_arraycnt(resp, 1); + /* Leader id */ + rd_kafka_buf_write_i32(resp, leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str(resp, leader->advertised_listener, -1); + /* Leader Port number */ + rd_kafka_buf_write_i32(resp, (int32_t)leader->port); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { + /* Leader Rack */ + rd_kafka_buf_write_str(resp, leader->rack, -1); + } + /* Remaining tags */ + rd_kafka_buf_write_tags_empty(resp); + } /* If there was no data, delay up to MaxWait. * This isn't strictly correct since we should cut the wait short * and feed newly produced data if a producer writes to the @@ -2306,8 +2417,8 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, const struct rd_kafka_mock_api_handler rd_kafka_mock_api_handlers[RD_KAFKAP__NUM] = { /* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */ - [RD_KAFKAP_Produce] = {0, 9, 9, rd_kafka_mock_handle_Produce}, - [RD_KAFKAP_Fetch] = {0, 15, 12, rd_kafka_mock_handle_Fetch}, + [RD_KAFKAP_Produce] = {0, 10, 9, rd_kafka_mock_handle_Produce}, + [RD_KAFKAP_Fetch] = {0, 16, 12, rd_kafka_mock_handle_Fetch}, [RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets}, [RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch}, [RD_KAFKAP_OffsetCommit] = {0, 9, 8, rd_kafka_mock_handle_OffsetCommit}, diff --git a/tests/0147-metadata_update_op_mock.c b/tests/0147-metadata_update_op_mock.c new file mode 100644 index 0000000000..0e5e10c394 --- /dev/null +++ b/tests/0147-metadata_update_op_mock.c @@ -0,0 +1,257 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" +#include "../src/rdkafka_proto.h" + +static void free_mock_requests(rd_kafka_mock_request_t **requests, + size_t request_cnt) { + size_t i; + for (i = 0; i < request_cnt; i++) + rd_kafka_mock_request_destroy(requests[i]); + rd_free(requests); +} + +/** + * @brief Produce API test + * We test the Metadata Update Operation being triggered via getting not leader + * error, where the metadata cache is updated which makes further produce calls + * based on the new updated metadata cache rather than making another metadata + * call to get the updates. + */ +static void test_produce(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + rd_bool_t request_to_broker1 = rd_false; + rd_bool_t request_to_broker2 = rd_false; + size_t i; + rd_kafka_t *producer; + rd_kafka_topic_t *rkt; + SUB_TEST(); + + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + + producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(producer, topic, NULL); + + + /* Leader for both the partition is broker 1 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1); + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_mock_clear_requests(mcluster); + + /* Produce to Partition 0 (with leader Broker 1) */ + test_produce_msgs(producer, rkt, 0, 0, 0, 1, "hello", 6); + + /* Verify that the produce call is made to Broker 1 */ + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + TEST_SAY("Got requests %zu \n", request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if ((rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Produce) && + (rd_kafka_mock_request_id(requests[i]) == 2)) + request_to_broker2 = rd_true; + } + free_mock_requests(requests, request_cnt); + rd_kafka_mock_clear_requests(mcluster); + TEST_ASSERT((!request_to_broker2), + "Produce Request should have been made to only Brokers 1."); + + /* Change the leader for Partition 0 to Broker 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + /* Make the Produce call to Partition 0 (with leader Broker 2) */ + test_produce_msgs(producer, rkt, 0, 0, 0, 1, "hello", 6); + request_to_broker1 = rd_false; + request_to_broker2 = rd_false; + /* Verify that the produce call is made first to broker 1 and then to + * broker 2 */ + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + /* First produce request should be to broker 1 */ + if ((rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Produce) && + (rd_kafka_mock_request_id(requests[i]) == 1) && + request_to_broker2 == rd_false) + request_to_broker1 = rd_true; + + /* Subsequent produce requests should be to broker 2 */ + if ((rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Produce) && + (rd_kafka_mock_request_id(requests[i]) == 2) && + request_to_broker1 == rd_true) + request_to_broker2 = rd_true; + } + free_mock_requests(requests, request_cnt); + rd_kafka_mock_clear_requests(mcluster); + rd_kafka_mock_stop_request_tracking(mcluster); + TEST_ASSERT((request_to_broker1 && request_to_broker2), + "Brokers didn't receive produce requests in sequence."); + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(producer); + SUB_TEST_PASS(); +} + +/** + * @brief Fetch API test + * We test the Metadata Update Operation being triggered via getting not leader + * error, where the metadata cache is updated which makes further fetch calls + * based on the new updated metadata cache rather than making another metadata + * call to get the updates. + */ +static void test_fetch(rd_kafka_mock_cluster_t *mcluster, + const char *topic, + rd_kafka_conf_t *conf) { + rd_kafka_mock_request_t **requests = NULL; + size_t request_cnt = 0; + rd_kafka_t *consumer; + rd_kafka_message_t *rkm; + rd_bool_t request_to_broker1 = rd_false; + rd_bool_t request_to_broker2 = rd_false; + size_t i; + consumer = test_create_consumer(topic, NULL, conf, NULL); + test_consumer_subscribe(consumer, topic); + + SUB_TEST(); + /* Leader for both the partition is broker 1 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1); + + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_mock_clear_requests(mcluster); + + rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + /* Verify that the fetch call is not made to broker 1 */ + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if ((rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Fetch) && + (rd_kafka_mock_request_id(requests[i]) == 2)) + request_to_broker2 = rd_true; + } + free_mock_requests(requests, request_cnt); + TEST_ASSERT(!request_to_broker2, + "Fetch Request should have been made to only Brokers 1."); + + rd_kafka_mock_clear_requests(mcluster); + + /* Change the leader for Partition 1 to Broker 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + /* Make the Fetch call again to trigger metadata update and cache + * changes */ + rkm = rd_kafka_consumer_poll(consumer, 5 * 1000); + if (rkm) + rd_kafka_message_destroy(rkm); + + /* Verify that the fetch call is made to both broker 1 and 2 */ + requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); + for (i = 0; i < request_cnt; i++) { + TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 + "\n", + rd_kafka_mock_request_id(requests[i]), + rd_kafka_mock_request_api_key(requests[i]), + rd_kafka_mock_request_timestamp(requests[i])); + + if ((rd_kafka_mock_request_api_key(requests[i]) == + RD_KAFKAP_Fetch)) { + if ((rd_kafka_mock_request_id(requests[i]) == 2) && + request_to_broker1 == rd_true) + request_to_broker2 = rd_true; + else if ((rd_kafka_mock_request_id(requests[i]) == 1) && + request_to_broker2 == rd_false) + request_to_broker1 = rd_true; + } + } + free_mock_requests(requests, request_cnt); + TEST_ASSERT(request_to_broker1 && request_to_broker2, + "Fetch calls weren't made in correct sequence."); + rd_kafka_mock_clear_requests(mcluster); + rd_kafka_destroy(consumer); + SUB_TEST_PASS(); +} + +/** + * @brief Metadata Update Operation (KIP 951) + * We test the behaviour when with a fetch or produce call we get a NOT_LEADER + * error, which should trigger the update_metadata_op and the metadata cache + * should be updated accordingly only for the partition with the changed leader. + */ +int main_0147_metadata_update_op_mock(int argc, char **argv) { + const char *topic = test_mk_topic_name("topic", 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + const char *bootstraps; + /* Spawning a mock cluster with 2 nodes */ + mcluster = test_mock_cluster_new(2, &bootstraps); + + test_conf_init(&conf, NULL, 30); + + /* Create 2 partitions for the topic */ + rd_kafka_mock_topic_create(mcluster, topic, 2, 1); + /* This test may be slower when running with CI or Helgrind, + * restart the timeout. */ + test_timeout_set(100); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "-1"); + + /* Test the case for the Produce API */ + test_produce(mcluster, topic, rd_kafka_conf_dup(conf)); + /* Test the case for the Fetch API */ + test_fetch(mcluster, topic, rd_kafka_conf_dup(conf)); + + test_mock_cluster_destroy(mcluster); + rd_kafka_conf_destroy(conf); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 62ce0deb02..b519cd5406 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -137,6 +137,7 @@ set( 0144-idempotence_mock.c 0145-pause_resume_mock.c 0146-metadata_mock.c + 0147-metadata_update_op_mock.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 83487f5e5c..7071590075 100644 --- a/tests/test.c +++ b/tests/test.c @@ -261,6 +261,7 @@ _TEST_DECL(0143_exponential_backoff_mock); _TEST_DECL(0144_idempotence_mock); _TEST_DECL(0145_pause_resume_mock); _TEST_DECL(0146_metadata_mock); +_TEST_DECL(0147_metadata_update_op_mock); /* Manual tests */ _TEST_DECL(8000_idle); @@ -518,7 +519,7 @@ struct test tests[] = { _TEST(0144_idempotence_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)), _TEST(0145_pause_resume_mock, TEST_F_LOCAL), _TEST(0146_metadata_mock, TEST_F_LOCAL), - + _TEST(0147_metadata_update_op_mock, TEST_F_LOCAL), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index a354f278f8..c0b7de1f25 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -227,6 +227,7 @@ + From 27158f6a637d34714669b622bdc0aa565b4d8e13 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 18 Jun 2024 13:22:34 +0530 Subject: [PATCH 2/5] Remove unnecessary semicolons --- src/rdkafka_request.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 6642017827..710cc727de 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3675,10 +3675,8 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, if (request->rkbuf_reqhdr.ApiVersion >= 10) { rd_kafkap_Produce_reply_tags_Topic_t *TopicTags = &ProduceTags.Topic; - ; rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags = &TopicTags->Partition; - ; /* Partition tags count */ TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName); From 50fb73347ecfebf891749b66468b26bcd6cad2ea Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 20 Jun 2024 14:22:11 +0200 Subject: [PATCH 3/5] Review comments: mainly in the mock handlers there's need to use the generic rd_kafka_buf_write_tags to support more tags later without duplicating the code and the leader changes can be more than one so that's reflected in the new code. I don't see necessary to have a separate file for the metadata update mock so kept it in the same metadata mock test file. Have changed a bit the test to be parametric for producer, consumer a single partition migration or two. Given there's a fast metadata refresh that is still called, we shouldn't just verify that messages are Fetched or Produced to the new leader but that it's done before the metadata refresh. One way it to slow down the metadata refresh but the mock handler rd_kafka_mock_broker_push_request_error_rtts isn't used in the metadata mock handler. Tried that but the were issues because the metadata request is also used by the producer for the test. Tried the interceptors too but the `rd_kafka_interceptor_f_on_request_sent_t` is called after sending the request. If it was before we could set a sleep there. I think at the moment we cannot test more than that automatically and in a predictable way --- src/rdkafka_metadata.c | 2 +- src/rdkafka_mock_handlers.c | 324 +++++++++++++++++---------- src/rdkafka_topic.c | 2 +- tests/0146-metadata_mock.c | 161 +++++++++++++ tests/0147-metadata_update_op_mock.c | 257 --------------------- tests/CMakeLists.txt | 1 - tests/test.c | 3 +- win32/tests/tests.vcxproj | 1 - 8 files changed, 365 insertions(+), 386 deletions(-) delete mode 100644 tests/0147-metadata_update_op_mock.c diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index f6419fe97d..26a989c0fa 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -2094,7 +2094,7 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) { rd_kafka_dbg(rk, METADATA, "METADATAUPDATE", "Partition %s(%s)[%" PRId32 - "]: " + "]:" " updated with leader %" PRId32 " and epoch %" PRId32, topic, rd_kafka_Uuid_base64str(&topic_id), diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 11773a75a5..2f75eb50f2 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -42,6 +42,58 @@ +void rd_kafka_mock_Produce_reply_tags_partition_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_partition_t *mpart) { + switch (tagtype) { + case 0: /* CurrentLeader */ + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch); + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + break; + default: + break; + } +} + +void rd_kafka_mock_Produce_reply_tags_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_broker_t **changed_leaders, + int changed_leader_cnt) { + int i; + switch (tagtype) { + case 0: /* NodeEndpoints */ + /* #NodeEndpoints */ + rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt); + for (i = 0; i < changed_leader_cnt; i++) { + rd_kafka_mock_broker_t *changed_leader = + changed_leaders[i]; + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, changed_leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str( + rkbuf, changed_leader->advertised_listener, -1); + + /* Leader Port number */ + rd_kafka_buf_write_i32(rkbuf, + (int32_t)changed_leader->port); + + /* Leader Rack */ + rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1); + + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + } + default: + break; + } +} + /** * @brief Handle ProduceRequest */ @@ -55,7 +107,12 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, int16_t Acks; int32_t TimeoutMs; rd_kafka_resp_err_t all_err; - rd_kafka_mock_broker_t *leader = NULL; + int32_t tags_to_write[1] = {0}; + size_t tags_to_write_cnt = 0; + int changed_leaders_cnt = 0; + rd_kafka_mock_broker_t **changed_leaders = + rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders)); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) rd_kafka_buf_read_str(rkbuf, &TransactionalId); @@ -79,7 +136,6 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_str(rkbuf, &Topic); rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, RD_KAFKAP_PARTITIONS_MAX); - mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); /* Response: Topic */ @@ -93,6 +149,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafkap_bytes_t records; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int64_t BaseOffset = -1; + int32_t partition_tags_to_write[1] = {0}; + size_t partition_tags_to_write_cnt = 0; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -101,10 +159,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, Partition); rd_kafka_buf_read_kbytes(rkbuf, &records); - /* Partition Tags */ rd_kafka_buf_skip_tags(rkbuf); - /* Response: Partition */ rd_kafka_buf_write_i32(resp, Partition); @@ -112,11 +168,9 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, err = all_err; else if (!mpart) err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - else if (mpart->leader != mconn->broker) { + else if (mpart->leader != mconn->broker) err = RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION; - leader = mpart->leader; - } /* Append to partition log */ if (!err) @@ -164,32 +218,38 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, /* Response: ErrorMessage */ rd_kafka_buf_write_str(resp, NULL, 0); } - /* Response: Partition tags */ - /* Partition tags count */ - rd_kafka_buf_write_uvarint( - resp, - rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && - err == - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION - ? 1 - : 0); - /* Partition tags */ if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) { - /* Tag type */ - rd_kafka_buf_write_uvarint(resp, 0); - /* Tag len = 4 (leader_id) + 4 - * (leader_epoch) + 1 (tags) */ - rd_kafka_buf_write_uvarint(resp, 9); - /* Leader id */ - rd_kafka_buf_write_i32(resp, mpart->leader->id); - /* Leader epoch */ - rd_kafka_buf_write_i32(resp, - mpart->leader_epoch); - /* Remaining tags */ - rd_kafka_buf_write_tags_empty(resp); + int changed_leader_idx; + /* See if this leader is already included */ + for (changed_leader_idx = 0; + changed_leader_idx < changed_leaders_cnt; + changed_leader_idx++) { + if (changed_leaders[changed_leader_idx] + ->id == mpart->leader->id) + break; + } + if (changed_leader_idx == changed_leaders_cnt) { + /* Add the new leader that wasn't + * present */ + changed_leaders[changed_leaders_cnt] = + mpart->leader; + changed_leaders_cnt++; + } + + partition_tags_to_write + [partition_tags_to_write_cnt] = + 0 /* CurrentLeader */; + partition_tags_to_write_cnt++; } + + /* Response: Partition tags */ + rd_kafka_buf_write_tags( + resp, + rd_kafka_mock_Produce_reply_tags_partition_write, + partition_tags_to_write, + partition_tags_to_write_cnt, mpart); } /* Topic tags */ @@ -204,46 +264,76 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, } /* Response: Top level tags */ - if (rkbuf->rkbuf_reqhdr.ApiVersion >= 9) { - /* Tag count */ - rd_kafka_buf_write_uvarint( - resp, - rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader ? 1 : 0); - - /* NodeEndpoint tags */ - if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && leader) { - /* Tag type */ - rd_kafka_buf_write_uvarint(resp, 0); - /* Tag Len */ - rd_kafka_buf_write_uvarint( - resp, 4 + strlen(leader->advertised_listener) + 2 + - 4 + 2); - /* NodeEndpoints array count */ - rd_kafka_buf_write_arraycnt(resp, 1); - /* Leader id */ - rd_kafka_buf_write_i32(resp, leader->id); - /* Leader Hostname */ - rd_kafka_buf_write_str(resp, - leader->advertised_listener, -1); - /* Leader Port number */ - rd_kafka_buf_write_i32(resp, (int32_t)leader->port); - if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { - /* Leader Rack */ - rd_kafka_buf_write_str(resp, leader->rack, -1); - } - /* Remaining tags */ - rd_kafka_buf_write_tags_empty(resp); - } + if (changed_leaders_cnt) { + tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */; + tags_to_write_cnt++; } - rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_kafka_buf_write_tags(resp, rd_kafka_mock_Produce_reply_tags_write, + tags_to_write, tags_to_write_cnt, + changed_leaders, changed_leaders_cnt); + + rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_free(changed_leaders); return 0; err_parse: + rd_free(changed_leaders); rd_kafka_buf_destroy(resp); return -1; } +void rd_kafka_mock_Fetch_reply_tags_partition_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_partition_t *mpart) { + switch (tagtype) { + case 1: /* CurrentLeader */ + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch); + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + break; + default: + break; + } +} + +void rd_kafka_mock_Fetch_reply_tags_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_broker_t **changed_leaders, + int changed_leader_cnt) { + int i; + switch (tagtype) { + case 0: /* NodeEndpoints */ + /* #NodeEndpoints */ + rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt); + for (i = 0; i < changed_leader_cnt; i++) { + rd_kafka_mock_broker_t *changed_leader = + changed_leaders[i]; + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, changed_leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str( + rkbuf, changed_leader->advertised_listener, -1); + + /* Leader Port number */ + rd_kafka_buf_write_i32(rkbuf, + (int32_t)changed_leader->port); + + /* Leader Rack */ + rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1); + + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + } + default: + break; + } +} /** @@ -258,8 +348,14 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, int32_t ReplicaId = -1, MaxWait, MinBytes, MaxBytes = -1, SessionId = -1, Epoch, TopicsCnt; int8_t IsolationLevel; - size_t totsize = 0; - rd_kafka_mock_broker_t *leader = NULL; + size_t totsize = 0; + + int32_t tags_to_write[1] = {0}; + uint64_t tags_to_write_cnt = 0; + + int changed_leaders_cnt = 0; + rd_kafka_mock_broker_t **changed_leaders = + rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders)); if (rkbuf->rkbuf_reqhdr.ApiVersion <= 14) { rd_kafka_buf_read_i32(rkbuf, &ReplicaId); @@ -338,8 +434,10 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_partition_t *mpart = NULL; rd_kafka_resp_err_t err = all_err; rd_bool_t on_follower; - size_t partsize = 0; - const rd_kafka_mock_msgset_t *mset = NULL; + size_t partsize = 0; + const rd_kafka_mock_msgset_t *mset = NULL; + int32_t partition_tags_to_write[1] = {0}; + uint64_t partition_tags_to_write_cnt = 0; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -479,44 +577,39 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_arraycnt(resp, 0); } - /* Partition tags count */ - rd_kafka_buf_write_uvarint( - resp, - rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && - (err == - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION || - err == - RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH) - ? 1 - : 0); - + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 12 && + err == RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER) { + int changed_leader_idx; + for (changed_leader_idx = 0; + changed_leader_idx < changed_leaders_cnt; + changed_leader_idx++) { + if (changed_leaders[changed_leader_idx] + ->id == mpart->leader->id) + break; + } + if (changed_leader_idx == changed_leaders_cnt) { + changed_leaders[changed_leaders_cnt] = + mpart->leader; + changed_leaders_cnt++; + } + /* CurrentLeader */ + partition_tags_to_write + [partition_tags_to_write_cnt] = 1; + partition_tags_to_write_cnt++; + } /* Response: Partition tags */ - if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && - (err == - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION || - err == RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH)) { - leader = mpart->leader; - - /* Tag type */ - rd_kafka_buf_write_uvarint(resp, 1); - /* Tag len = 4 (leader_id) + 4 - * (leader_epoch) + 1 (tags) */ - rd_kafka_buf_write_uvarint(resp, 9); - /* Leader id */ - rd_kafka_buf_write_i32(resp, mpart->leader->id); - /* Leader epoch */ - rd_kafka_buf_write_i32(resp, - mpart->leader_epoch); - /* Remaining tags */ - rd_kafka_buf_write_tags_empty(resp); - } + rd_kafka_buf_write_tags( + resp, + rd_kafka_mock_Fetch_reply_tags_partition_write, + partition_tags_to_write, + partition_tags_to_write_cnt, mpart); } - /* Response: Topic tags */ - rd_kafka_buf_write_tags_empty(resp); /* Topic tags */ rd_kafka_buf_skip_tags(rkbuf); + /* Response: Topic tags */ + rd_kafka_buf_write_tags_empty(resp); } if (rkbuf->rkbuf_reqhdr.ApiVersion >= 7) { @@ -552,34 +645,17 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, RD_KAFKAP_STR_DUPA(&rack, &RackId); /* Matt might do something sensible with this */ } - rd_kafka_buf_skip_tags(rkbuf); - /* Response: Top level tags */ - rd_kafka_buf_write_uvarint( - resp, rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && leader ? 1 : 0); - - - /* Response: Partition tags */ - if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && leader) { - /* Tag type */ - rd_kafka_buf_write_uvarint(resp, 0); - /* Tag Len */ - rd_kafka_buf_write_uvarint( - resp, 4 + strlen(leader->advertised_listener) + 2 + 4 + 2); - /* NodeEndpoints array count */ - rd_kafka_buf_write_arraycnt(resp, 1); - /* Leader id */ - rd_kafka_buf_write_i32(resp, leader->id); - /* Leader Hostname */ - rd_kafka_buf_write_str(resp, leader->advertised_listener, -1); - /* Leader Port number */ - rd_kafka_buf_write_i32(resp, (int32_t)leader->port); - if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { - /* Leader Rack */ - rd_kafka_buf_write_str(resp, leader->rack, -1); - } - /* Remaining tags */ - rd_kafka_buf_write_tags_empty(resp); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && changed_leaders_cnt) { + tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */; + tags_to_write_cnt++; } + + /* Response: Top level tags */ + rd_kafka_buf_write_tags(resp, rd_kafka_mock_Fetch_reply_tags_write, + tags_to_write, tags_to_write_cnt, + changed_leaders, changed_leaders_cnt); + /* If there was no data, delay up to MaxWait. * This isn't strictly correct since we should cut the wait short * and feed newly produced data if a producer writes to the @@ -589,10 +665,12 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, resp->rkbuf_ts_retry = rd_clock() + (MaxWait * 1000); rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_free(changed_leaders); return 0; err_parse: rd_kafka_buf_destroy(resp); + rd_free(changed_leaders); return -1; } diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index db1ca2d390..fd3a175364 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1375,7 +1375,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rd_kafka_toppar_get(rkt, mdt->partitions[j].id, 0); rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", - " Topic %s partition %i Leader %" PRId32 + "Topic %s [%" PRId32 "] Leader %" PRId32 " Epoch %" PRId32, rkt->rkt_topic->str, mdt->partitions[j].id, mdt->partitions[j].leader, leader_epoch); diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c index 95e03de8b3..7e1d4e49b2 100644 --- a/tests/0146-metadata_mock.c +++ b/tests/0146-metadata_mock.c @@ -253,8 +253,162 @@ static void do_test_metadata_call_before_join(void) { SUB_TEST_PASS(); } +typedef struct expected_request_s { + int16_t api_key; + int32_t broker; +} expected_request_t; + +/** + * @brief Verify that a request with the expected ApiKey and broker + * was sent to the cluster. + */ +rd_bool_t verify_requests_after_metadata_update_operation( + rd_kafka_mock_cluster_t *mcluster, + expected_request_t *expected_request) { + size_t cnt, i; + rd_kafka_mock_request_t **requests = + rd_kafka_mock_get_requests(mcluster, &cnt); + rd_bool_t found = rd_false; + + for (i = 0; i < cnt; i++) { + int16_t api_key; + int32_t broker; + rd_kafka_mock_request_t *request = requests[i]; + api_key = rd_kafka_mock_request_api_key(request); + broker = rd_kafka_mock_request_id(request); + if (api_key == expected_request->api_key && + broker == expected_request->broker) { + found = rd_true; + break; + } + } + + rd_kafka_mock_request_destroy_array(requests, cnt); + + return found; +} + +/** + * @brief A metadata update request should be triggered when a leader change + * happens while producing or consuming and cause a migration + * to the new leader. + * + * @param producer If true, the test will be for a producer, otherwise + * for a consumer. + * @param second_leader_change If true, a leader change will be triggered + * for two partitions, otherwise for one. + */ +static void do_test_metadata_update_operation(rd_bool_t producer, + rd_bool_t second_leader_change) { + rd_kafka_t *rk; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_conf_t *conf; + test_timing_t timing; + rd_bool_t found; + expected_request_t expected_request = { + .api_key = producer ? RD_KAFKAP_Produce : RD_KAFKAP_Fetch, + .broker = 3}; + + SUB_TEST_QUICK("%s, %s", producer ? "producer" : "consumer", + second_leader_change ? "two leader changes" + : "single leader change"); + + mcluster = test_mock_cluster_new(4, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 2, 4); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2); + + test_conf_init(&conf, NULL, 20); + test_conf_set(conf, "bootstrap.servers", bootstraps); + + if (producer) { + test_conf_set(conf, "batch.num.messages", "1"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + /* Start producing to leader 1 and 2 */ + test_produce_msgs2(rk, topic, 0, 0, 0, 1, NULL, 0); + test_produce_msgs2(rk, topic, 0, 1, 0, 1, NULL, 0); + rd_kafka_flush(rk, 1000); + } else { + rd_kafka_topic_partition_list_t *assignment; + test_conf_set(conf, "group.id", topic); + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + assignment = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(assignment, topic, 0); + rd_kafka_topic_partition_list_add(assignment, topic, 1); + test_consumer_assign("2 partitions", rk, assignment); + rd_kafka_topic_partition_list_destroy(assignment); + + /* Start consuming from leader 1 and 2 */ + test_consumer_poll_no_msgs("no errors", rk, 0, 1000); + } + + TIMING_START(&timing, "Metadata update and partition migration"); + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3); + if (second_leader_change) + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 4); + + + if (producer) { + /* Produce two new messages to the new leaders */ + test_produce_msgs2(rk, topic, 0, 0, 1, 1, NULL, 0); + test_produce_msgs2(rk, topic, 0, 1, 1, 1, NULL, 0); + rd_kafka_flush(rk, 1000); + } else { + /* Produce two new messages and consume them from + * the new leaders */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 0, + "bootstrap.servers", bootstraps, NULL); + test_produce_msgs_easy_v(topic, 0, 1, 0, 1, 0, + "bootstrap.servers", bootstraps, NULL); + test_consumer_poll_timeout("partition 0", rk, 0, -1, -1, 2, + NULL, 5000); + } + TIMING_ASSERT_LATER(&timing, 0, 2000); + + /* Leader change triggers the metadata update and migration + * of partition 0 to brokers 3 and with 'second_leader_change' also + * of partition 1 to broker 4. */ + found = verify_requests_after_metadata_update_operation( + mcluster, &expected_request); + if (!found) + TEST_FAIL( + "Requests with ApiKey %s" + " were not found on broker %" PRId32, + rd_kafka_ApiKey2str(expected_request.api_key), + expected_request.broker); + + if (second_leader_change) { + expected_request.broker = 4; + } else { + expected_request.broker = 2; + } + + found = verify_requests_after_metadata_update_operation( + mcluster, &expected_request); + if (!found) + TEST_FAIL( + "Requests with ApiKey %s" + " were not found on broker %" PRId32, + rd_kafka_ApiKey2str(expected_request.api_key), + expected_request.broker); + + rd_kafka_mock_stop_request_tracking(mcluster); + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + int main_0146_metadata_mock(int argc, char **argv) { TEST_SKIP_MOCK_CLUSTER(0); + int variation; /* No need to test the "roundrobin" assignor case, * as this is just for checking the two code paths: @@ -268,5 +422,12 @@ int main_0146_metadata_mock(int argc, char **argv) { do_test_stale_metadata_doesnt_migrate_partition(); + for (variation = 0; variation < 4; variation++) { + do_test_metadata_update_operation( + variation / 2, /* 0-1: consumer, 2-3 producer */ + variation % 2 /* 1-3: second leader change, + * 0-2: single leader change */); + } + return 0; } diff --git a/tests/0147-metadata_update_op_mock.c b/tests/0147-metadata_update_op_mock.c deleted file mode 100644 index 0e5e10c394..0000000000 --- a/tests/0147-metadata_update_op_mock.c +++ /dev/null @@ -1,257 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2023, Confluent Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" -#include "rdkafka.h" -#include "../src/rdkafka_proto.h" - -static void free_mock_requests(rd_kafka_mock_request_t **requests, - size_t request_cnt) { - size_t i; - for (i = 0; i < request_cnt; i++) - rd_kafka_mock_request_destroy(requests[i]); - rd_free(requests); -} - -/** - * @brief Produce API test - * We test the Metadata Update Operation being triggered via getting not leader - * error, where the metadata cache is updated which makes further produce calls - * based on the new updated metadata cache rather than making another metadata - * call to get the updates. - */ -static void test_produce(rd_kafka_mock_cluster_t *mcluster, - const char *topic, - rd_kafka_conf_t *conf) { - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt = 0; - rd_bool_t request_to_broker1 = rd_false; - rd_bool_t request_to_broker2 = rd_false; - size_t i; - rd_kafka_t *producer; - rd_kafka_topic_t *rkt; - SUB_TEST(); - - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - - producer = test_create_handle(RD_KAFKA_PRODUCER, conf); - rkt = test_create_producer_topic(producer, topic, NULL); - - - /* Leader for both the partition is broker 1 */ - rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); - rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1); - rd_kafka_mock_start_request_tracking(mcluster); - rd_kafka_mock_clear_requests(mcluster); - - /* Produce to Partition 0 (with leader Broker 1) */ - test_produce_msgs(producer, rkt, 0, 0, 0, 1, "hello", 6); - - /* Verify that the produce call is made to Broker 1 */ - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); - TEST_SAY("Got requests %zu \n", request_cnt); - for (i = 0; i < request_cnt; i++) { - TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 - "\n", - rd_kafka_mock_request_id(requests[i]), - rd_kafka_mock_request_api_key(requests[i]), - rd_kafka_mock_request_timestamp(requests[i])); - - if ((rd_kafka_mock_request_api_key(requests[i]) == - RD_KAFKAP_Produce) && - (rd_kafka_mock_request_id(requests[i]) == 2)) - request_to_broker2 = rd_true; - } - free_mock_requests(requests, request_cnt); - rd_kafka_mock_clear_requests(mcluster); - TEST_ASSERT((!request_to_broker2), - "Produce Request should have been made to only Brokers 1."); - - /* Change the leader for Partition 0 to Broker 2 */ - rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); - - /* Make the Produce call to Partition 0 (with leader Broker 2) */ - test_produce_msgs(producer, rkt, 0, 0, 0, 1, "hello", 6); - request_to_broker1 = rd_false; - request_to_broker2 = rd_false; - /* Verify that the produce call is made first to broker 1 and then to - * broker 2 */ - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); - for (i = 0; i < request_cnt; i++) { - TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 - "\n", - rd_kafka_mock_request_id(requests[i]), - rd_kafka_mock_request_api_key(requests[i]), - rd_kafka_mock_request_timestamp(requests[i])); - - /* First produce request should be to broker 1 */ - if ((rd_kafka_mock_request_api_key(requests[i]) == - RD_KAFKAP_Produce) && - (rd_kafka_mock_request_id(requests[i]) == 1) && - request_to_broker2 == rd_false) - request_to_broker1 = rd_true; - - /* Subsequent produce requests should be to broker 2 */ - if ((rd_kafka_mock_request_api_key(requests[i]) == - RD_KAFKAP_Produce) && - (rd_kafka_mock_request_id(requests[i]) == 2) && - request_to_broker1 == rd_true) - request_to_broker2 = rd_true; - } - free_mock_requests(requests, request_cnt); - rd_kafka_mock_clear_requests(mcluster); - rd_kafka_mock_stop_request_tracking(mcluster); - TEST_ASSERT((request_to_broker1 && request_to_broker2), - "Brokers didn't receive produce requests in sequence."); - - rd_kafka_topic_destroy(rkt); - rd_kafka_destroy(producer); - SUB_TEST_PASS(); -} - -/** - * @brief Fetch API test - * We test the Metadata Update Operation being triggered via getting not leader - * error, where the metadata cache is updated which makes further fetch calls - * based on the new updated metadata cache rather than making another metadata - * call to get the updates. - */ -static void test_fetch(rd_kafka_mock_cluster_t *mcluster, - const char *topic, - rd_kafka_conf_t *conf) { - rd_kafka_mock_request_t **requests = NULL; - size_t request_cnt = 0; - rd_kafka_t *consumer; - rd_kafka_message_t *rkm; - rd_bool_t request_to_broker1 = rd_false; - rd_bool_t request_to_broker2 = rd_false; - size_t i; - consumer = test_create_consumer(topic, NULL, conf, NULL); - test_consumer_subscribe(consumer, topic); - - SUB_TEST(); - /* Leader for both the partition is broker 1 */ - rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); - rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1); - - rd_kafka_mock_start_request_tracking(mcluster); - rd_kafka_mock_clear_requests(mcluster); - - rkm = rd_kafka_consumer_poll(consumer, 10 * 1000); - if (rkm) - rd_kafka_message_destroy(rkm); - /* Verify that the fetch call is not made to broker 1 */ - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); - for (i = 0; i < request_cnt; i++) { - TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 - "\n", - rd_kafka_mock_request_id(requests[i]), - rd_kafka_mock_request_api_key(requests[i]), - rd_kafka_mock_request_timestamp(requests[i])); - - if ((rd_kafka_mock_request_api_key(requests[i]) == - RD_KAFKAP_Fetch) && - (rd_kafka_mock_request_id(requests[i]) == 2)) - request_to_broker2 = rd_true; - } - free_mock_requests(requests, request_cnt); - TEST_ASSERT(!request_to_broker2, - "Fetch Request should have been made to only Brokers 1."); - - rd_kafka_mock_clear_requests(mcluster); - - /* Change the leader for Partition 1 to Broker 2 */ - rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2); - rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); - - /* Make the Fetch call again to trigger metadata update and cache - * changes */ - rkm = rd_kafka_consumer_poll(consumer, 5 * 1000); - if (rkm) - rd_kafka_message_destroy(rkm); - - /* Verify that the fetch call is made to both broker 1 and 2 */ - requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); - for (i = 0; i < request_cnt; i++) { - TEST_SAY("Broker Id : %d API Key : %d Timestamp : %" PRId64 - "\n", - rd_kafka_mock_request_id(requests[i]), - rd_kafka_mock_request_api_key(requests[i]), - rd_kafka_mock_request_timestamp(requests[i])); - - if ((rd_kafka_mock_request_api_key(requests[i]) == - RD_KAFKAP_Fetch)) { - if ((rd_kafka_mock_request_id(requests[i]) == 2) && - request_to_broker1 == rd_true) - request_to_broker2 = rd_true; - else if ((rd_kafka_mock_request_id(requests[i]) == 1) && - request_to_broker2 == rd_false) - request_to_broker1 = rd_true; - } - } - free_mock_requests(requests, request_cnt); - TEST_ASSERT(request_to_broker1 && request_to_broker2, - "Fetch calls weren't made in correct sequence."); - rd_kafka_mock_clear_requests(mcluster); - rd_kafka_destroy(consumer); - SUB_TEST_PASS(); -} - -/** - * @brief Metadata Update Operation (KIP 951) - * We test the behaviour when with a fetch or produce call we get a NOT_LEADER - * error, which should trigger the update_metadata_op and the metadata cache - * should be updated accordingly only for the partition with the changed leader. - */ -int main_0147_metadata_update_op_mock(int argc, char **argv) { - const char *topic = test_mk_topic_name("topic", 1); - rd_kafka_mock_cluster_t *mcluster; - rd_kafka_conf_t *conf; - const char *bootstraps; - /* Spawning a mock cluster with 2 nodes */ - mcluster = test_mock_cluster_new(2, &bootstraps); - - test_conf_init(&conf, NULL, 30); - - /* Create 2 partitions for the topic */ - rd_kafka_mock_topic_create(mcluster, topic, 2, 1); - /* This test may be slower when running with CI or Helgrind, - * restart the timeout. */ - test_timeout_set(100); - test_conf_set(conf, "bootstrap.servers", bootstraps); - test_conf_set(conf, "topic.metadata.refresh.interval.ms", "-1"); - - /* Test the case for the Produce API */ - test_produce(mcluster, topic, rd_kafka_conf_dup(conf)); - /* Test the case for the Fetch API */ - test_fetch(mcluster, topic, rd_kafka_conf_dup(conf)); - - test_mock_cluster_destroy(mcluster); - rd_kafka_conf_destroy(conf); - return 0; -} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b519cd5406..62ce0deb02 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -137,7 +137,6 @@ set( 0144-idempotence_mock.c 0145-pause_resume_mock.c 0146-metadata_mock.c - 0147-metadata_update_op_mock.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 7071590075..83487f5e5c 100644 --- a/tests/test.c +++ b/tests/test.c @@ -261,7 +261,6 @@ _TEST_DECL(0143_exponential_backoff_mock); _TEST_DECL(0144_idempotence_mock); _TEST_DECL(0145_pause_resume_mock); _TEST_DECL(0146_metadata_mock); -_TEST_DECL(0147_metadata_update_op_mock); /* Manual tests */ _TEST_DECL(8000_idle); @@ -519,7 +518,7 @@ struct test tests[] = { _TEST(0144_idempotence_mock, TEST_F_LOCAL, TEST_BRKVER(0, 11, 0, 0)), _TEST(0145_pause_resume_mock, TEST_F_LOCAL), _TEST(0146_metadata_mock, TEST_F_LOCAL), - _TEST(0147_metadata_update_op_mock, TEST_F_LOCAL), + /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index c0b7de1f25..a354f278f8 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -227,7 +227,6 @@ - From 3d26d3a9754b75dda60f99c48b6f21ba7d628af2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 21 Jun 2024 14:05:41 +0200 Subject: [PATCH 4/5] Fix to reduce flakyness of test 0146/do_test_stale_metadata_doesnt_migrate_partition --- tests/0146-metadata_mock.c | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c index 7e1d4e49b2..c0f1d7b11a 100644 --- a/tests/0146-metadata_mock.c +++ b/tests/0146-metadata_mock.c @@ -37,7 +37,12 @@ static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request, static rd_bool_t is_fetch_request(rd_kafka_mock_request_t *request, void *opaque) { - return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; + int32_t *broker_id = (int32_t *)opaque; + rd_bool_t ret = + rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; + if (broker_id) + ret &= rd_kafka_mock_request_id(request) == *broker_id; + return ret; } /** @@ -157,6 +162,7 @@ static void do_test_stale_metadata_doesnt_migrate_partition(void) { rd_kafka_mock_cluster_t *mcluster; const char *topic = test_mk_topic_name(__FUNCTION__, 1); rd_kafka_conf_t *conf; + int32_t expected_broker_id; SUB_TEST_QUICK(); @@ -171,6 +177,7 @@ static void do_test_stale_metadata_doesnt_migrate_partition(void) { test_conf_set(conf, "enable.auto.commit", "false"); test_conf_set(conf, "fetch.error.backoff.ms", "10"); test_conf_set(conf, "fetch.wait.max.ms", "10"); + test_conf_set(conf, "fetch.queue.backoff.ms", "10"); rk = test_create_handle(RD_KAFKA_CONSUMER, conf); @@ -184,27 +191,31 @@ static void do_test_stale_metadata_doesnt_migrate_partition(void) { /* Change leader to 2, Fetch fails, refreshes metadata. */ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); - for (i = 0; i < 5; i++) { - /* Validation fails, metadata refreshed again */ - rd_kafka_mock_broker_push_request_error_rtts( - mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); - } + /* Validation fails, metadata refreshed again */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); /* Wait partition migrates to broker 2 */ rd_usleep(100 * 1000, 0); - /* Return stale metadata */ + /* Ask to return stale metadata while calling OffsetForLeaderEpoch */ + rd_kafka_mock_start_request_tracking(mcluster); for (i = 0; i < 10; i++) { rd_kafka_mock_partition_push_leader_response( mcluster, topic, 0, 1 /*leader id*/, 0 /*leader epoch*/); } - /* Partition doesn't have to migrate back to broker 1 */ + /* After the error on OffsetForLeaderEpoch metadata is refreshed + * and it returns the stale metadata. + * 1s for the OffsetForLeaderEpoch plus at least 500ms for + * restarting the fetch requests */ rd_usleep(2000 * 1000, 0); - rd_kafka_mock_start_request_tracking(mcluster); - fetch_requests = test_mock_wait_matching_requests( - mcluster, 0, 500, is_fetch_request, NULL); + + /* Partition doesn't have to migrate back to broker 1 */ + expected_broker_id = 1; + fetch_requests = test_mock_wait_matching_requests( + mcluster, 0, 500, is_fetch_request, &expected_broker_id); TEST_ASSERT(fetch_requests == 0, "No fetch request should be received by broker 1, got %d", fetch_requests); From ec931afd002826c9b07e043d1258db964937ce66 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 21 Jun 2024 15:43:18 +0200 Subject: [PATCH 5/5] CHANGELOG and INTRODUCTION documentation --- CHANGELOG.md | 6 ++++++ INTRODUCTION.md | 9 +++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 79c5d8f065..86d5bb9382 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ librdkafka v2.5.0 is a feature release. +* [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) + Leader discovery optimisations for the client (#4756, #4767). * Fix segfault when using long client id because of erased segment when using flexver. (#4689) * Fix for an idempotent producer error, with a message batch not reconstructed identically when retried (#4750) @@ -12,6 +14,10 @@ librdkafka v2.5.0 is a feature release. * Update bundled lz4 (used when `./configure --disable-lz4-ext`) to [v1.9.4](https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains bugfixes and performance improvements (#4726). + * [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) + With this KIP leader updates are received through Produce and Fetch responses + in case of errors corresponding to leader changes and a partition migration + happens before refreshing the metadata cache (#4756, #4767). ## Fixes diff --git a/INTRODUCTION.md b/INTRODUCTION.md index b5fe3fa1d7..1449d01dd6 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -2044,7 +2044,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | | KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | | KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | -| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported | +| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 | Supported | | KIP-584 - Versioning scheme for features | WIP | Not supported | | KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported | | KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported | @@ -2053,8 +2053,9 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-654 - Aborted txns with non-flushed msgs should not be fatal | 2.7.0 | Supported | | KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported | | KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported | -| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported | +| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 | Supported | | KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early Access | +| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported | @@ -2068,8 +2069,8 @@ release of librdkafka. | ApiKey | Request name | Kafka max | librdkafka max | | ------- | ----------------------------- | ---------- | -------------- | -| 0 | Produce | 10 | 9 | -| 1 | Fetch | 16 | 15 | +| 0 | Produce | 10 | 10 | +| 1 | Fetch | 16 | 16 | | 2 | ListOffsets | 8 | 7 | | 3 | Metadata | 12 | 12 | | 8 | OffsetCommit | 9 | 9 |