diff --git a/CHANGELOG.md b/CHANGELOG.md index 79c5d8f065..86d5bb9382 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ librdkafka v2.5.0 is a feature release. +* [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) + Leader discovery optimisations for the client (#4756, #4767). * Fix segfault when using long client id because of erased segment when using flexver. (#4689) * Fix for an idempotent producer error, with a message batch not reconstructed identically when retried (#4750) @@ -12,6 +14,10 @@ librdkafka v2.5.0 is a feature release. * Update bundled lz4 (used when `./configure --disable-lz4-ext`) to [v1.9.4](https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains bugfixes and performance improvements (#4726). + * [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) + With this KIP leader updates are received through Produce and Fetch responses + in case of errors corresponding to leader changes and a partition migration + happens before refreshing the metadata cache (#4756, #4767). ## Fixes diff --git a/INTRODUCTION.md b/INTRODUCTION.md index b5fe3fa1d7..1449d01dd6 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -2044,7 +2044,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported | | KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported | | KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported | -| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported | +| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 | Supported | | KIP-584 - Versioning scheme for features | WIP | Not supported | | KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported | | KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported | @@ -2053,8 +2053,9 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-654 - Aborted txns with non-flushed msgs should not be fatal | 2.7.0 | Supported | | KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported | | KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported | -| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported | +| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 | Supported | | KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early Access | +| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported | @@ -2068,8 +2069,8 @@ release of librdkafka. | ApiKey | Request name | Kafka max | librdkafka max | | ------- | ----------------------------- | ---------- | -------------- | -| 0 | Produce | 10 | 9 | -| 1 | Fetch | 16 | 15 | +| 0 | Produce | 10 | 10 | +| 1 | Fetch | 16 | 16 | | 2 | ListOffsets | 8 | 7 | | 3 | Metadata | 12 | 12 | | 8 | OffsetCommit | 9 | 9 | diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index f6419fe97d..26a989c0fa 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -2094,7 +2094,7 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) { rd_kafka_dbg(rk, METADATA, "METADATAUPDATE", "Partition %s(%s)[%" PRId32 - "]: " + "]:" " updated with leader %" PRId32 " and epoch %" PRId32, topic, rd_kafka_Uuid_base64str(&topic_id), diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index d67cc6e60f..2f75eb50f2 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -42,6 +42,58 @@ +void rd_kafka_mock_Produce_reply_tags_partition_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_partition_t *mpart) { + switch (tagtype) { + case 0: /* CurrentLeader */ + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch); + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + break; + default: + break; + } +} + +void rd_kafka_mock_Produce_reply_tags_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_broker_t **changed_leaders, + int changed_leader_cnt) { + int i; + switch (tagtype) { + case 0: /* NodeEndpoints */ + /* #NodeEndpoints */ + rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt); + for (i = 0; i < changed_leader_cnt; i++) { + rd_kafka_mock_broker_t *changed_leader = + changed_leaders[i]; + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, changed_leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str( + rkbuf, changed_leader->advertised_listener, -1); + + /* Leader Port number */ + rd_kafka_buf_write_i32(rkbuf, + (int32_t)changed_leader->port); + + /* Leader Rack */ + rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1); + + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + } + default: + break; + } +} + /** * @brief Handle ProduceRequest */ @@ -55,6 +107,12 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, int16_t Acks; int32_t TimeoutMs; rd_kafka_resp_err_t all_err; + int32_t tags_to_write[1] = {0}; + size_t tags_to_write_cnt = 0; + int changed_leaders_cnt = 0; + rd_kafka_mock_broker_t **changed_leaders = + rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders)); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) rd_kafka_buf_read_str(rkbuf, &TransactionalId); @@ -78,7 +136,6 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_str(rkbuf, &Topic); rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, RD_KAFKAP_PARTITIONS_MAX); - mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); /* Response: Topic */ @@ -92,6 +149,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, rd_kafkap_bytes_t records; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int64_t BaseOffset = -1; + int32_t partition_tags_to_write[1] = {0}; + size_t partition_tags_to_write_cnt = 0; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -100,10 +159,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, Partition); rd_kafka_buf_read_kbytes(rkbuf, &records); - /* Partition Tags */ rd_kafka_buf_skip_tags(rkbuf); - /* Response: Partition */ rd_kafka_buf_write_i32(resp, Partition); @@ -161,8 +218,38 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, /* Response: ErrorMessage */ rd_kafka_buf_write_str(resp, NULL, 0); } + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 && + err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) { + int changed_leader_idx; + /* See if this leader is already included */ + for (changed_leader_idx = 0; + changed_leader_idx < changed_leaders_cnt; + changed_leader_idx++) { + if (changed_leaders[changed_leader_idx] + ->id == mpart->leader->id) + break; + } + if (changed_leader_idx == changed_leaders_cnt) { + /* Add the new leader that wasn't + * present */ + changed_leaders[changed_leaders_cnt] = + mpart->leader; + changed_leaders_cnt++; + } + + partition_tags_to_write + [partition_tags_to_write_cnt] = + 0 /* CurrentLeader */; + partition_tags_to_write_cnt++; + } + /* Response: Partition tags */ - rd_kafka_buf_write_tags_empty(resp); + rd_kafka_buf_write_tags( + resp, + rd_kafka_mock_Produce_reply_tags_partition_write, + partition_tags_to_write, + partition_tags_to_write_cnt, mpart); } /* Topic tags */ @@ -177,17 +264,76 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn, } /* Response: Top level tags */ - rd_kafka_buf_write_tags_empty(resp); + if (changed_leaders_cnt) { + tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */; + tags_to_write_cnt++; + } - rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_kafka_buf_write_tags(resp, rd_kafka_mock_Produce_reply_tags_write, + tags_to_write, tags_to_write_cnt, + changed_leaders, changed_leaders_cnt); + rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_free(changed_leaders); return 0; err_parse: + rd_free(changed_leaders); rd_kafka_buf_destroy(resp); return -1; } +void rd_kafka_mock_Fetch_reply_tags_partition_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_partition_t *mpart) { + switch (tagtype) { + case 1: /* CurrentLeader */ + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader->id); + /* Leader epoch */ + rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch); + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + break; + default: + break; + } +} + +void rd_kafka_mock_Fetch_reply_tags_write( + rd_kafka_buf_t *rkbuf, + int tagtype, + rd_kafka_mock_broker_t **changed_leaders, + int changed_leader_cnt) { + int i; + switch (tagtype) { + case 0: /* NodeEndpoints */ + /* #NodeEndpoints */ + rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt); + for (i = 0; i < changed_leader_cnt; i++) { + rd_kafka_mock_broker_t *changed_leader = + changed_leaders[i]; + /* Leader id */ + rd_kafka_buf_write_i32(rkbuf, changed_leader->id); + /* Leader Hostname */ + rd_kafka_buf_write_str( + rkbuf, changed_leader->advertised_listener, -1); + + /* Leader Port number */ + rd_kafka_buf_write_i32(rkbuf, + (int32_t)changed_leader->port); + + /* Leader Rack */ + rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1); + + /* Field tags */ + rd_kafka_buf_write_tags_empty(rkbuf); + } + default: + break; + } +} /** @@ -204,6 +350,13 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, int8_t IsolationLevel; size_t totsize = 0; + int32_t tags_to_write[1] = {0}; + uint64_t tags_to_write_cnt = 0; + + int changed_leaders_cnt = 0; + rd_kafka_mock_broker_t **changed_leaders = + rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders)); + if (rkbuf->rkbuf_reqhdr.ApiVersion <= 14) { rd_kafka_buf_read_i32(rkbuf, &ReplicaId); } @@ -281,8 +434,10 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_partition_t *mpart = NULL; rd_kafka_resp_err_t err = all_err; rd_bool_t on_follower; - size_t partsize = 0; - const rd_kafka_mock_msgset_t *mset = NULL; + size_t partsize = 0; + const rd_kafka_mock_msgset_t *mset = NULL; + int32_t partition_tags_to_write[1] = {0}; + uint64_t partition_tags_to_write_cnt = 0; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -422,14 +577,39 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_arraycnt(resp, 0); } + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 12 && + err == RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER) { + int changed_leader_idx; + for (changed_leader_idx = 0; + changed_leader_idx < changed_leaders_cnt; + changed_leader_idx++) { + if (changed_leaders[changed_leader_idx] + ->id == mpart->leader->id) + break; + } + if (changed_leader_idx == changed_leaders_cnt) { + changed_leaders[changed_leaders_cnt] = + mpart->leader; + changed_leaders_cnt++; + } + /* CurrentLeader */ + partition_tags_to_write + [partition_tags_to_write_cnt] = 1; + partition_tags_to_write_cnt++; + } + /* Response: Partition tags */ - rd_kafka_buf_write_tags_empty(resp); + rd_kafka_buf_write_tags( + resp, + rd_kafka_mock_Fetch_reply_tags_partition_write, + partition_tags_to_write, + partition_tags_to_write_cnt, mpart); } - /* Response: Topic tags */ - rd_kafka_buf_write_tags_empty(resp); /* Topic tags */ rd_kafka_buf_skip_tags(rkbuf); + /* Response: Topic tags */ + rd_kafka_buf_write_tags_empty(resp); } if (rkbuf->rkbuf_reqhdr.ApiVersion >= 7) { @@ -466,8 +646,15 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, /* Matt might do something sensible with this */ } + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && changed_leaders_cnt) { + tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */; + tags_to_write_cnt++; + } + /* Response: Top level tags */ - rd_kafka_buf_write_tags_empty(resp); + rd_kafka_buf_write_tags(resp, rd_kafka_mock_Fetch_reply_tags_write, + tags_to_write, tags_to_write_cnt, + changed_leaders, changed_leaders_cnt); /* If there was no data, delay up to MaxWait. * This isn't strictly correct since we should cut the wait short @@ -478,10 +665,12 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn, resp->rkbuf_ts_retry = rd_clock() + (MaxWait * 1000); rd_kafka_mock_connection_send_response0(mconn, resp, rd_true); + rd_free(changed_leaders); return 0; err_parse: rd_kafka_buf_destroy(resp); + rd_free(changed_leaders); return -1; } @@ -2306,8 +2495,8 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn, const struct rd_kafka_mock_api_handler rd_kafka_mock_api_handlers[RD_KAFKAP__NUM] = { /* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */ - [RD_KAFKAP_Produce] = {0, 9, 9, rd_kafka_mock_handle_Produce}, - [RD_KAFKAP_Fetch] = {0, 15, 12, rd_kafka_mock_handle_Fetch}, + [RD_KAFKAP_Produce] = {0, 10, 9, rd_kafka_mock_handle_Produce}, + [RD_KAFKAP_Fetch] = {0, 16, 12, rd_kafka_mock_handle_Fetch}, [RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets}, [RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch}, [RD_KAFKAP_OffsetCommit] = {0, 9, 8, rd_kafka_mock_handle_OffsetCommit}, diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 6642017827..710cc727de 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -3675,10 +3675,8 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, if (request->rkbuf_reqhdr.ApiVersion >= 10) { rd_kafkap_Produce_reply_tags_Topic_t *TopicTags = &ProduceTags.Topic; - ; rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags = &TopicTags->Partition; - ; /* Partition tags count */ TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName); diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index db1ca2d390..fd3a175364 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -1375,7 +1375,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rd_kafka_toppar_get(rkt, mdt->partitions[j].id, 0); rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", - " Topic %s partition %i Leader %" PRId32 + "Topic %s [%" PRId32 "] Leader %" PRId32 " Epoch %" PRId32, rkt->rkt_topic->str, mdt->partitions[j].id, mdt->partitions[j].leader, leader_epoch); diff --git a/tests/0146-metadata_mock.c b/tests/0146-metadata_mock.c index 95e03de8b3..c0f1d7b11a 100644 --- a/tests/0146-metadata_mock.c +++ b/tests/0146-metadata_mock.c @@ -37,7 +37,12 @@ static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request, static rd_bool_t is_fetch_request(rd_kafka_mock_request_t *request, void *opaque) { - return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; + int32_t *broker_id = (int32_t *)opaque; + rd_bool_t ret = + rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; + if (broker_id) + ret &= rd_kafka_mock_request_id(request) == *broker_id; + return ret; } /** @@ -157,6 +162,7 @@ static void do_test_stale_metadata_doesnt_migrate_partition(void) { rd_kafka_mock_cluster_t *mcluster; const char *topic = test_mk_topic_name(__FUNCTION__, 1); rd_kafka_conf_t *conf; + int32_t expected_broker_id; SUB_TEST_QUICK(); @@ -171,6 +177,7 @@ static void do_test_stale_metadata_doesnt_migrate_partition(void) { test_conf_set(conf, "enable.auto.commit", "false"); test_conf_set(conf, "fetch.error.backoff.ms", "10"); test_conf_set(conf, "fetch.wait.max.ms", "10"); + test_conf_set(conf, "fetch.queue.backoff.ms", "10"); rk = test_create_handle(RD_KAFKA_CONSUMER, conf); @@ -184,27 +191,31 @@ static void do_test_stale_metadata_doesnt_migrate_partition(void) { /* Change leader to 2, Fetch fails, refreshes metadata. */ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); - for (i = 0; i < 5; i++) { - /* Validation fails, metadata refreshed again */ - rd_kafka_mock_broker_push_request_error_rtts( - mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); - } + /* Validation fails, metadata refreshed again */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 1000); /* Wait partition migrates to broker 2 */ rd_usleep(100 * 1000, 0); - /* Return stale metadata */ + /* Ask to return stale metadata while calling OffsetForLeaderEpoch */ + rd_kafka_mock_start_request_tracking(mcluster); for (i = 0; i < 10; i++) { rd_kafka_mock_partition_push_leader_response( mcluster, topic, 0, 1 /*leader id*/, 0 /*leader epoch*/); } - /* Partition doesn't have to migrate back to broker 1 */ + /* After the error on OffsetForLeaderEpoch metadata is refreshed + * and it returns the stale metadata. + * 1s for the OffsetForLeaderEpoch plus at least 500ms for + * restarting the fetch requests */ rd_usleep(2000 * 1000, 0); - rd_kafka_mock_start_request_tracking(mcluster); - fetch_requests = test_mock_wait_matching_requests( - mcluster, 0, 500, is_fetch_request, NULL); + + /* Partition doesn't have to migrate back to broker 1 */ + expected_broker_id = 1; + fetch_requests = test_mock_wait_matching_requests( + mcluster, 0, 500, is_fetch_request, &expected_broker_id); TEST_ASSERT(fetch_requests == 0, "No fetch request should be received by broker 1, got %d", fetch_requests); @@ -253,8 +264,162 @@ static void do_test_metadata_call_before_join(void) { SUB_TEST_PASS(); } +typedef struct expected_request_s { + int16_t api_key; + int32_t broker; +} expected_request_t; + +/** + * @brief Verify that a request with the expected ApiKey and broker + * was sent to the cluster. + */ +rd_bool_t verify_requests_after_metadata_update_operation( + rd_kafka_mock_cluster_t *mcluster, + expected_request_t *expected_request) { + size_t cnt, i; + rd_kafka_mock_request_t **requests = + rd_kafka_mock_get_requests(mcluster, &cnt); + rd_bool_t found = rd_false; + + for (i = 0; i < cnt; i++) { + int16_t api_key; + int32_t broker; + rd_kafka_mock_request_t *request = requests[i]; + api_key = rd_kafka_mock_request_api_key(request); + broker = rd_kafka_mock_request_id(request); + if (api_key == expected_request->api_key && + broker == expected_request->broker) { + found = rd_true; + break; + } + } + + rd_kafka_mock_request_destroy_array(requests, cnt); + + return found; +} + +/** + * @brief A metadata update request should be triggered when a leader change + * happens while producing or consuming and cause a migration + * to the new leader. + * + * @param producer If true, the test will be for a producer, otherwise + * for a consumer. + * @param second_leader_change If true, a leader change will be triggered + * for two partitions, otherwise for one. + */ +static void do_test_metadata_update_operation(rd_bool_t producer, + rd_bool_t second_leader_change) { + rd_kafka_t *rk; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_conf_t *conf; + test_timing_t timing; + rd_bool_t found; + expected_request_t expected_request = { + .api_key = producer ? RD_KAFKAP_Produce : RD_KAFKAP_Fetch, + .broker = 3}; + + SUB_TEST_QUICK("%s, %s", producer ? "producer" : "consumer", + second_leader_change ? "two leader changes" + : "single leader change"); + + mcluster = test_mock_cluster_new(4, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 2, 4); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2); + + test_conf_init(&conf, NULL, 20); + test_conf_set(conf, "bootstrap.servers", bootstraps); + + if (producer) { + test_conf_set(conf, "batch.num.messages", "1"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + /* Start producing to leader 1 and 2 */ + test_produce_msgs2(rk, topic, 0, 0, 0, 1, NULL, 0); + test_produce_msgs2(rk, topic, 0, 1, 0, 1, NULL, 0); + rd_kafka_flush(rk, 1000); + } else { + rd_kafka_topic_partition_list_t *assignment; + test_conf_set(conf, "group.id", topic); + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + assignment = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(assignment, topic, 0); + rd_kafka_topic_partition_list_add(assignment, topic, 1); + test_consumer_assign("2 partitions", rk, assignment); + rd_kafka_topic_partition_list_destroy(assignment); + + /* Start consuming from leader 1 and 2 */ + test_consumer_poll_no_msgs("no errors", rk, 0, 1000); + } + + TIMING_START(&timing, "Metadata update and partition migration"); + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3); + if (second_leader_change) + rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 4); + + + if (producer) { + /* Produce two new messages to the new leaders */ + test_produce_msgs2(rk, topic, 0, 0, 1, 1, NULL, 0); + test_produce_msgs2(rk, topic, 0, 1, 1, 1, NULL, 0); + rd_kafka_flush(rk, 1000); + } else { + /* Produce two new messages and consume them from + * the new leaders */ + test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 0, + "bootstrap.servers", bootstraps, NULL); + test_produce_msgs_easy_v(topic, 0, 1, 0, 1, 0, + "bootstrap.servers", bootstraps, NULL); + test_consumer_poll_timeout("partition 0", rk, 0, -1, -1, 2, + NULL, 5000); + } + TIMING_ASSERT_LATER(&timing, 0, 2000); + + /* Leader change triggers the metadata update and migration + * of partition 0 to brokers 3 and with 'second_leader_change' also + * of partition 1 to broker 4. */ + found = verify_requests_after_metadata_update_operation( + mcluster, &expected_request); + if (!found) + TEST_FAIL( + "Requests with ApiKey %s" + " were not found on broker %" PRId32, + rd_kafka_ApiKey2str(expected_request.api_key), + expected_request.broker); + + if (second_leader_change) { + expected_request.broker = 4; + } else { + expected_request.broker = 2; + } + + found = verify_requests_after_metadata_update_operation( + mcluster, &expected_request); + if (!found) + TEST_FAIL( + "Requests with ApiKey %s" + " were not found on broker %" PRId32, + rd_kafka_ApiKey2str(expected_request.api_key), + expected_request.broker); + + rd_kafka_mock_stop_request_tracking(mcluster); + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + int main_0146_metadata_mock(int argc, char **argv) { TEST_SKIP_MOCK_CLUSTER(0); + int variation; /* No need to test the "roundrobin" assignor case, * as this is just for checking the two code paths: @@ -268,5 +433,12 @@ int main_0146_metadata_mock(int argc, char **argv) { do_test_stale_metadata_doesnt_migrate_partition(); + for (variation = 0; variation < 4; variation++) { + do_test_metadata_update_operation( + variation / 2, /* 0-1: consumer, 2-3 producer */ + variation % 2 /* 1-3: second leader change, + * 0-2: single leader change */); + } + return 0; }