From d7e779e55c1953068a93cc1434afdd11c57184cd Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 10 May 2023 18:22:14 +0200 Subject: [PATCH 1/6] Metadata and leader epoch refactor. store private metadata into a struct that contains the public one. --- src/rdkafka.c | 2 +- src/rdkafka_admin.c | 2 +- src/rdkafka_aux.c | 2 +- src/rdkafka_aux.h | 2 +- src/rdkafka_buf.h | 4 + src/rdkafka_int.h | 5 +- src/rdkafka_metadata.c | 262 +++++++++++++++++++++-------------- src/rdkafka_metadata.h | 68 +++++++-- src/rdkafka_metadata_cache.c | 65 ++++++--- src/rdkafka_request.c | 30 ++-- src/rdkafka_topic.c | 47 ++++--- src/rdkafka_topic.h | 2 +- 12 files changed, 297 insertions(+), 194 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 33147ccd4f..2a5e040b68 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -990,7 +990,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) { mtx_destroy(&rk->rk_init_lock); if (rk->rk_full_metadata) - rd_kafka_metadata_destroy(rk->rk_full_metadata); + rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata); rd_kafkap_str_destroy(rk->rk_client_id); rd_kafkap_str_destroy(rk->rk_group_id); rd_kafkap_str_destroy(rk->rk_eos.transactional_id); diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 6aaec636d5..2226899477 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -6349,7 +6349,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; - int nodeid; + int32_t nodeid; uint16_t port; int16_t api_version; int32_t cnt; diff --git a/src/rdkafka_aux.c b/src/rdkafka_aux.c index 753f03d678..da565d1594 100644 --- a/src/rdkafka_aux.c +++ b/src/rdkafka_aux.c @@ -234,7 +234,7 @@ void rd_kafka_acl_result_free(void *ptr) { * @return A new allocated Node object. * Use rd_kafka_Node_destroy() to free when done. */ -rd_kafka_Node_t *rd_kafka_Node_new(int id, +rd_kafka_Node_t *rd_kafka_Node_new(int32_t id, const char *host, uint16_t port, const char *rack_id) { diff --git a/src/rdkafka_aux.h b/src/rdkafka_aux.h index ccf18e91e7..c78f51f2cf 100644 --- a/src/rdkafka_aux.h +++ b/src/rdkafka_aux.h @@ -111,7 +111,7 @@ typedef struct rd_kafka_Node_s { } rd_kafka_Node_t; rd_kafka_Node_t * -rd_kafka_Node_new(int id, const char *host, uint16_t port, const char *rack_id); +rd_kafka_Node_new(int32_t id, const char *host, uint16_t port, const char *rack_id); rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src); diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index b4f606317b..f8d9c2e23d 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -682,6 +682,10 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ size_t _slen; \ char *_dst; \ rd_kafka_buf_read_str(rkbuf, &_kstr); \ + if (RD_KAFKAP_STR_IS_NULL(&_kstr)) { \ + dst = NULL; \ + break; \ + } \ _slen = RD_KAFKAP_STR_LEN(&_kstr); \ if (!(_dst = rd_tmpabuf_write(tmpabuf, _kstr.str, _slen + 1))) \ rd_kafka_buf_parse_fail( \ diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 584ff3c965..fb0a0d04be 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -350,8 +350,9 @@ struct rd_kafka_s { rd_ts_t rk_ts_metadata; /* Timestamp of most recent * metadata. */ - struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */ - rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ + rd_kafka_metadata_internal_t + *rk_full_metadata; /* Last full metadata. */ + rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ char *rk_clusterid; /* ClusterId from metadata */ diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index b9cc333b9d..586c19a725 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -38,6 +38,70 @@ #include #include +/** + * @brief TODO: write + * + * @param _a + * @param _b + * @return int + */ +static int rd_kafka_metadata_broker_internal_cmp(const void *_a, + const void *_b) { + const rd_kafka_metadata_broker_internal_t *a = _a; + const rd_kafka_metadata_broker_internal_t *b = _b; + return RD_CMP(a->id, b->id); +} + +/** + * @brief TODO: write + * + * @param _a + * @param _b + * @return int + */ +static int rd_kafka_metadata_partition_internal_cmp(const void *_a, + const void *_b) { + const rd_kafka_metadata_partition_internal_t *a = _a; + const rd_kafka_metadata_partition_internal_t *b = _b; + return RD_CMP(a->id, b->id); +} + +/** + * @brief TODO: write + * + * @param metadata + * @return const rd_kafka_metadata_t* + */ +const rd_kafka_metadata_t * +rd_kafka_metadata_internal_metadata(rd_kafka_metadata_internal_t *mdi) { + return &mdi->metadata; +} + +/** + * @brief TODO: write + * + * @param metadata + * @param i + * @return const rd_kafka_metadata_topic_internal_t* + */ +const rd_kafka_metadata_topic_internal_t * +rd_kafka_metadata_internal_topic(rd_kafka_metadata_internal_t *metadata, + int i) { + return &metadata->topics[i]; +} + +/** + * @brief TODO: write + * + * @param metadata + * @param leader_epoch + */ +void rd_kafka_metadata_internal_partition_set_leader_epoch( + rd_kafka_metadata_partition_internal_t *metadata_partition, + int32_t leader_epoch) { + metadata_partition->leader_epoch = leader_epoch; +} + rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, @@ -130,9 +194,12 @@ void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata) { /** * @returns a newly allocated copy of metadata \p src of size \p size */ -struct rd_kafka_metadata * -rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { +rd_kafka_metadata_internal_t * +rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *src_internal, + size_t size) { struct rd_kafka_metadata *md; + rd_kafka_metadata_internal_t *mdi; + const struct rd_kafka_metadata *src = &src_internal->metadata; rd_tmpabuf_t tbuf; int i; @@ -143,23 +210,37 @@ rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { * any pointer fields needs to be copied explicitly to update * the pointer address. */ rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/); - md = rd_tmpabuf_write(&tbuf, src, sizeof(*md)); + mdi = rd_tmpabuf_write(&tbuf, src, sizeof(*mdi)); + md = &mdi->metadata; rd_tmpabuf_write_str(&tbuf, src->orig_broker_name); /* Copy Brokers */ md->brokers = rd_tmpabuf_write(&tbuf, src->brokers, - md->broker_cnt * sizeof(*md->brokers)); + src->broker_cnt * sizeof(*src->brokers)); + /* Copy internal Brokers */ + mdi->brokers = + rd_tmpabuf_write(&tbuf, src_internal->brokers, + src->broker_cnt * sizeof(*src_internal->brokers)); - for (i = 0; i < md->broker_cnt; i++) + for (i = 0; i < md->broker_cnt; i++) { md->brokers[i].host = rd_tmpabuf_write_str(&tbuf, src->brokers[i].host); + if (src_internal->brokers[i].rack_id) { + mdi->brokers[i].rack_id = rd_tmpabuf_write_str( + &tbuf, src_internal->brokers[i].rack_id); + } + } /* Copy TopicMetadata */ md->topics = rd_tmpabuf_write(&tbuf, src->topics, md->topic_cnt * sizeof(*md->topics)); + /* Copy internal TopicMetadata */ + mdi->topics = + rd_tmpabuf_write(&tbuf, src_internal->topics, + md->topic_cnt * sizeof(*src_internal->topics)); for (i = 0; i < md->topic_cnt; i++) { int j; @@ -173,6 +254,11 @@ rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { rd_tmpabuf_write(&tbuf, src->topics[i].partitions, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)); + /* Copy internal partitions */ + mdi->topics[i].partitions = rd_tmpabuf_write( + &tbuf, src_internal->topics[i].partitions, + md->topics[i].partition_cnt * + sizeof(*src_internal->topics[i].partitions)); for (j = 0; j < md->topics[i].partition_cnt; j++) { /* Copy replicas and ISRs */ @@ -195,27 +281,15 @@ rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { /* Delibarely not destroying the tmpabuf since we return * its allocated memory. */ - return md; -} - - - -/** - * @brief Partition (id) comparator for partition_id_leader_epoch struct. - */ -static int rd_kafka_metadata_partition_leader_epoch_cmp(const void *_a, - const void *_b) { - const rd_kafka_partition_leader_epoch_t *a = _a, *b = _b; - return RD_CMP(a->partition_id, b->partition_id); + return mdi; } - /** * @brief Update topic state and information based on topic metadata. * * @param mdt Topic metadata. - * @param leader_epochs Per-partition leader epoch array, or NULL if not known. + * @param mdit Topic internal metadata. * * @locality rdkafka main thread * @locks_acquired rd_kafka_wrlock(rk) @@ -223,7 +297,7 @@ static int rd_kafka_metadata_partition_leader_epoch_cmp(const void *_a, static void rd_kafka_parse_Metadata_update_topic( rd_kafka_broker_t *rkb, const rd_kafka_metadata_topic_t *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs) { + const rd_kafka_metadata_topic_internal_t *mdit) { rd_rkb_dbg(rkb, METADATA, "METADATA", /* The indent below is intentional */ @@ -244,7 +318,7 @@ static void rd_kafka_parse_Metadata_update_topic( } else { /* Update local topic & partition state based * on metadata */ - rd_kafka_topic_metadata_update2(rkb, mdt, leader_epochs); + rd_kafka_topic_metadata_update2(rkb, mdt, mdit); } } @@ -290,12 +364,12 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf, - struct rd_kafka_metadata **mdp, - rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p) { + rd_kafka_metadata_internal_t **mdip) { rd_kafka_t *rk = rkb->rkb_rk; int i, j, k; rd_tmpabuf_t tbuf; - struct rd_kafka_metadata *md = NULL; + rd_kafka_metadata_internal_t *mdi = NULL; + rd_kafka_metadata_t *md = NULL; size_t rkb_namelen; const int log_decode_errors = LOG_ERR; rd_list_t *missing_topics = NULL; @@ -303,6 +377,8 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_bool_t all_topics = request->rkbuf_u.Metadata.all_topics; rd_bool_t cgrp_update = request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp; + rd_bool_t has_reliable_leader_epochs = + rd_kafka_has_reliable_leader_epochs(rkb); const char *reason = request->rkbuf_u.Metadata.reason ? request->rkbuf_u.Metadata.reason : "(no reason)"; @@ -312,12 +388,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int broker_changes = 0; int cache_changes = 0; - /** This array is reused and resized as necessary to hold per-partition - * leader epochs (ApiVersion >= 7). */ - rd_kafka_partition_leader_epoch_t *leader_epochs = NULL; - /** Number of allocated elements in leader_epochs. */ - size_t leader_epochs_size = 0; - rd_ts_t ts_start = rd_clock(); + rd_ts_t ts_start = rd_clock(); /* Ignore metadata updates when terminating */ if (rd_kafka_terminating(rkb->rkb_rk)) { @@ -340,12 +411,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4), 0 /*dont assert on fail*/); - if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) { + if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)))) { rd_kafka_broker_unlock(rkb); err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; goto err; } + md = &mdi->metadata; md->orig_broker_id = rkb->rkb_nodeid; md->orig_broker_name = rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen); @@ -364,10 +436,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, "%d brokers: tmpabuf memory shortage", md->broker_cnt); - if (ApiVersion >= 1 && broker_rack_pair_p) { - *broker_rack_pair_p = rd_malloc( - sizeof(rd_kafka_broker_id_rack_pair_t) * md->broker_cnt); - } + if (!(mdi->brokers = rd_tmpabuf_alloc( + &tbuf, md->broker_cnt * sizeof(*mdi->brokers)))) + rd_kafka_buf_parse_fail( + rkbuf, "%d internal brokers: tmpabuf memory shortage", + md->broker_cnt); for (i = 0; i < md->broker_cnt; i++) { rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id); @@ -375,15 +448,12 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, md->brokers[i].host); rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].port); + mdi->brokers[i].id = md->brokers[i].id; if (ApiVersion >= 1) { - rd_kafkap_str_t rack; - rd_kafka_buf_read_str(rkbuf, &rack); - if (broker_rack_pair_p) { - (*broker_rack_pair_p)[i].broker_id = - md->brokers[i].id; - (*broker_rack_pair_p)[i].rack = - rd_kafkap_str_copy(&rack); - } + rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, + mdi->brokers[i].rack_id); + } else { + mdi->brokers[i].rack_id = NULL; } rd_kafka_buf_skip_tags(rkbuf); @@ -399,10 +469,8 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, RD_KAFKAP_STR_PR(&cluster_id), controller_id); } - if (broker_rack_pair_p) - qsort(*broker_rack_pair_p, md->broker_cnt, - sizeof(rd_kafka_broker_id_rack_pair_t), - rd_kafka_broker_id_rack_pair_cmp); + qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[i]), + rd_kafka_metadata_broker_internal_cmp); /* Read TopicMetadata */ rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX); @@ -414,6 +482,12 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_buf_parse_fail( rkbuf, "%d topics: tmpabuf memory shortage", md->topic_cnt); + if (!(mdi->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt * + sizeof(*mdi->topics)))) + rd_kafka_buf_parse_fail( + rkbuf, "%d internal topics: tmpabuf memory shortage", + md->topic_cnt); + for (i = 0; i < md->topic_cnt; i++) { rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err); rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, @@ -436,16 +510,15 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, md->topics[i].topic, md->topics[i].partition_cnt); - /* Resize reused leader_epochs array to fit this partition's - * leader epochs. */ - if (ApiVersion >= 7 && md->topics[i].partition_cnt > 0 && - (size_t)md->topics[i].partition_cnt > leader_epochs_size) { - leader_epochs_size = - RD_MAX(32, md->topics[i].partition_cnt); - leader_epochs = - rd_realloc(leader_epochs, sizeof(*leader_epochs) * - leader_epochs_size); - } + if (!(mdi->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, md->topics[i].partition_cnt * + sizeof(*mdi->topics[i].partitions)))) + rd_kafka_buf_parse_fail(rkbuf, + "%s: %d internal partitions: " + "tmpabuf memory shortage", + md->topics[i].topic, + md->topics[i].partition_cnt); + for (j = 0; j < md->topics[i].partition_cnt; j++) { rd_kafka_buf_read_i16a(rkbuf, @@ -454,11 +527,19 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, md->topics[i].partitions[j].id); rd_kafka_buf_read_i32a( rkbuf, md->topics[i].partitions[j].leader); + + mdi->topics[i].partitions[j].id = + md->topics[i].partitions[j].id; if (ApiVersion >= 7) { - leader_epochs[j].partition_id = - md->topics[i].partitions[j].id; rd_kafka_buf_read_i32( - rkbuf, &leader_epochs[j].leader_epoch); + rkbuf, + &mdi->topics[i].partitions[j].leader_epoch); + if (!has_reliable_leader_epochs) + mdi->topics[i] + .partitions[j] + .leader_epoch = -1; + } else { + mdi->topics[i].partitions[j].leader_epoch = -1; } /* Replicas */ @@ -552,37 +633,17 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, continue; } - if (leader_epochs_size > 0 && - !rd_kafka_has_reliable_leader_epochs(rkb)) { - /* Prior to Kafka version 2.4 (which coincides with - * Metadata version 9), the broker does not propagate - * leader epoch information accurately while a - * reassignment is in progress. Relying on a stale - * epoch can lead to FENCED_LEADER_EPOCH errors which - * can prevent consumption throughout the course of - * a reassignment. It is safer in this case to revert - * to the behavior in previous protocol versions - * which checks leader status only. */ - leader_epochs_size = 0; - rd_free(leader_epochs); - leader_epochs = NULL; - } - - /* Sort partitions by partition id */ qsort(md->topics[i].partitions, md->topics[i].partition_cnt, sizeof(*md->topics[i].partitions), rd_kafka_metadata_partition_id_cmp); - if (leader_epochs_size > 0) { - /* And sort leader_epochs by partition id */ - qsort(leader_epochs, md->topics[i].partition_cnt, - sizeof(*leader_epochs), - rd_kafka_metadata_partition_leader_epoch_cmp); - } + qsort(mdi->topics[i].partitions, md->topics[i].partition_cnt, + sizeof(*mdi->topics[i].partitions), + rd_kafka_metadata_partition_internal_cmp); /* Update topic state based on the topic metadata */ rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], - leader_epochs); + &mdi->topics[i]); if (requested_topics) { @@ -596,7 +657,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_wrlock(rk); rd_kafka_metadata_cache_topic_update( - rk, &md->topics[i], + rk, &md->topics[i], &mdi->topics[i], rd_false /*propagate later*/); cache_changes++; rd_kafka_wrunlock(rk); @@ -710,9 +771,9 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (rkb->rkb_rk->rk_full_metadata) rd_kafka_metadata_destroy( - rkb->rkb_rk->rk_full_metadata); + &rkb->rkb_rk->rk_full_metadata->metadata); rkb->rkb_rk->rk_full_metadata = - rd_kafka_metadata_copy(md, tbuf.of); + rd_kafka_metadata_copy(mdi, tbuf.of); rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata; rd_rkb_dbg(rkb, METADATA, "METADATA", "Caching full metadata with " @@ -758,16 +819,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (missing_topics) rd_list_destroy(missing_topics); - if (leader_epochs) - rd_free(leader_epochs); - /* This metadata request was triggered by someone wanting * the metadata information back as a reply, so send that reply now. * In this case we must not rd_free the metadata memory here, * the requestee will do. * The tbuf is explicitly not destroyed as we return its memory * to the caller. */ - *mdp = md; + *mdip = mdi; return RD_KAFKA_RESP_ERR_NO_ERROR; @@ -784,19 +842,6 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (missing_topics) rd_list_destroy(missing_topics); - - if (leader_epochs) - rd_free(leader_epochs); - - if (broker_rack_pair_p && *broker_rack_pair_p) { - /* md must be allocated if *broker_rack_pair_p != NULL - since md->brokers_cnt is used to allocate it */ - rd_assert(md); - rd_kafka_broker_rack_pair_destroy_cnt(*broker_rack_pair_p, - md->broker_cnt); - *broker_rack_pair_p = NULL; - } - rd_tmpabuf_destroy(&tbuf); return err; @@ -824,12 +869,15 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *errored) { int ti, i; size_t cnt = 0; - const struct rd_kafka_metadata *metadata; + rd_kafka_metadata_internal_t *mdi; + struct rd_kafka_metadata *metadata; rd_kafka_topic_partition_list_t *unmatched; rd_kafka_rdlock(rk); - metadata = rk->rk_full_metadata; - if (!metadata) { + mdi = rk->rk_full_metadata; + metadata = &mdi->metadata; + + if (!mdi) { rd_kafka_rdunlock(rk); return 0; } diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 537a852354..90809630ca 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -31,17 +31,56 @@ #include "rdavl.h" +/** + * @brief Metadata partition internal container + */ +typedef struct rd_kafka_metadata_partition_internal_s { + /** Partition Id */ + int32_t id; + /** Partition leader epoch */ + int32_t leader_epoch; +} rd_kafka_metadata_partition_internal_t; + +/** + * @brief Metadata topic internal container + */ +typedef struct rd_kafka_metadata_topic_internal_s { + /** Internal metadata partition structs. + * same count as metadata.topics[i].partition_cnt. */ + rd_kafka_metadata_partition_internal_t *partitions; +} rd_kafka_metadata_topic_internal_t; + + +typedef struct rd_kafka_metadata_broker_internal_s { + int32_t id; + char *rack_id; +} rd_kafka_metadata_broker_internal_t; + +/** + * @brief Metadata internal container + */ +typedef struct rd_kafka_metadata_internal_s { + rd_kafka_metadata_t + metadata; /**< Public metadata struct. Must + be kept the first field so the pointer + can be cast to *rd_kafka_metadata_internal_t + when needed */ + /* Internal metadata brokers. Same count as metadata.broker_cnt. */ + rd_kafka_metadata_broker_internal_t *brokers; + /* Internal metadata topics. Same count as metadata.topic_cnt. */ + rd_kafka_metadata_topic_internal_t *topics; +} rd_kafka_metadata_internal_t; + + rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb); -rd_kafka_resp_err_t -rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *request, - rd_kafka_buf_t *rkbuf, - struct rd_kafka_metadata **mdp, - rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p); +rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *request, + rd_kafka_buf_t *rkbuf, + rd_kafka_metadata_internal_t **mdp); -struct rd_kafka_metadata * -rd_kafka_metadata_copy(const struct rd_kafka_metadata *md, size_t size); +rd_kafka_metadata_internal_t * +rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *mdi, size_t size); size_t rd_kafka_metadata_topic_match(rd_kafka_t *rk, @@ -102,7 +141,6 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, size_t topic_cnt); rd_kafka_metadata_t *rd_kafka_metadata_new_topic_mockv(size_t topic_cnt, ...); - /** * @{ * @@ -117,6 +155,8 @@ struct rd_kafka_metadata_cache_entry { /** Last known leader epochs array (same size as the partition count), * or NULL if not known. */ rd_kafka_metadata_topic_t rkmce_mtopic; /* Cached topic metadata */ + /* Cached internal topic metadata */ + rd_kafka_metadata_topic_internal_t rkmce_metadata_internal_topic; /* rkmce_topics.partitions memory points here. */ }; @@ -159,11 +199,13 @@ struct rd_kafka_metadata_cache { void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk); int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts); -void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mdt, - rd_bool_t propagate); +void rd_kafka_metadata_cache_topic_update( + rd_kafka_t *rk, + const rd_kafka_metadata_topic_t *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_bool_t propagate); void rd_kafka_metadata_cache_update(rd_kafka_t *rk, - const rd_kafka_metadata_t *md, + const rd_kafka_metadata_internal_t *mdi, int abs_update); void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk); struct rd_kafka_metadata_cache_entry * diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index 514d391a83..e82f890d6d 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -238,11 +238,12 @@ int rd_kafka_metadata_partition_id_cmp(const void *_a, const void *_b) { * * @locks_required rd_kafka_wrlock() */ -static struct rd_kafka_metadata_cache_entry * -rd_kafka_metadata_cache_insert(rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mtopic, - rd_ts_t now, - rd_ts_t ts_expires) { +static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( + rd_kafka_t *rk, + const rd_kafka_metadata_topic_t *mtopic, + const rd_kafka_metadata_topic_internal_t *metadata_internal_topic, + rd_ts_t now, + rd_ts_t ts_expires) { struct rd_kafka_metadata_cache_entry *rkmce, *old; size_t topic_len; rd_tmpabuf_t tbuf; @@ -255,17 +256,21 @@ rd_kafka_metadata_cache_insert(rd_kafka_t *rk, * any pointer fields needs to be copied explicitly to update * the pointer address. */ topic_len = strlen(mtopic->topic) + 1; - rd_tmpabuf_new(&tbuf, - RD_ROUNDUP(sizeof(*rkmce), 8) + - RD_ROUNDUP(topic_len, 8) + - (mtopic->partition_cnt * - RD_ROUNDUP(sizeof(*mtopic->partitions), 8)), - 1 /*assert on fail*/); + rd_tmpabuf_new( + &tbuf, + RD_ROUNDUP(sizeof(*rkmce), 8) + RD_ROUNDUP(topic_len, 8) + + (mtopic->partition_cnt * + RD_ROUNDUP(sizeof(*mtopic->partitions), 8)) + + (mtopic->partition_cnt * + RD_ROUNDUP(sizeof(*metadata_internal_topic->partitions), 8)), + 1 /*assert on fail*/); rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce)); rkmce->rkmce_mtopic = *mtopic; + rkmce->rkmce_metadata_internal_topic = *metadata_internal_topic; + /* Copy topic name and update pointer */ rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic); @@ -274,6 +279,12 @@ rd_kafka_metadata_cache_insert(rd_kafka_t *rk, &tbuf, mtopic->partitions, mtopic->partition_cnt * sizeof(*mtopic->partitions)); + /* Copy partition array (internal) and update pointer */ + rkmce->rkmce_metadata_internal_topic.partitions = + rd_tmpabuf_write(&tbuf, metadata_internal_topic->partitions, + mtopic->partition_cnt * + sizeof(*metadata_internal_topic->partitions)); + /* Clear uncached fields. */ for (i = 0; i < mtopic->partition_cnt; i++) { rkmce->rkmce_mtopic.partitions[i].replicas = NULL; @@ -287,6 +298,8 @@ rd_kafka_metadata_cache_insert(rd_kafka_t *rk, sizeof(*rkmce->rkmce_mtopic.partitions), rd_kafka_metadata_partition_id_cmp); + /* partitions (internal) are already sorted. */ + TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); rk->rk_metadata_cache.rkmc_cnt++; @@ -365,9 +378,11 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) { * * @locks rd_kafka_wrlock() */ -void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mdt, - rd_bool_t propagate) { +void rd_kafka_metadata_cache_topic_update( + rd_kafka_t *rk, + const rd_kafka_metadata_topic_t *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_bool_t propagate) { rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int changed = 1; @@ -380,7 +395,7 @@ void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, if (!mdt->err || mdt->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED || mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) - rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires); + rd_kafka_metadata_cache_insert(rk, mdt, mdit, now, ts_expires); else changed = rd_kafka_metadata_cache_delete_by_name(rk, mdt->topic); @@ -398,23 +413,24 @@ void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, * @locks rd_kafka_wrlock() */ void rd_kafka_metadata_cache_update(rd_kafka_t *rk, - const rd_kafka_metadata_t *md, + const rd_kafka_metadata_internal_t *mdi, int abs_update) { struct rd_kafka_metadata_cache_entry *rkmce; rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int i; - rd_kafka_dbg(rk, METADATA, "METADATA", - "%s of metadata cache with %d topic(s)", - abs_update ? "Absolute update" : "Update", md->topic_cnt); + rd_kafka_dbg( + rk, METADATA, "METADATA", "%s of metadata cache with %d topic(s)", + abs_update ? "Absolute update" : "Update", mdi->metadata.topic_cnt); if (abs_update) rd_kafka_metadata_cache_purge(rk, rd_false /*not observers*/); - for (i = 0; i < md->topic_cnt; i++) - rd_kafka_metadata_cache_insert(rk, &md->topics[i], now, + for (i = 0; i < mdi->metadata.topic_cnt; i++) + rd_kafka_metadata_cache_insert(rk, &mdi->metadata.topics[i], + &mdi->topics[i], now, ts_expires); /* Update expiry timer */ @@ -424,7 +440,7 @@ void rd_kafka_metadata_cache_update(rd_kafka_t *rk, rkmce->rkmce_ts_expires - now, rd_kafka_metadata_cache_evict_tmr_cb, rk); - if (md->topic_cnt > 0 || abs_update) + if (mdi->metadata.topic_cnt > 0 || abs_update) rd_kafka_metadata_cache_propagate_changes(rk); } @@ -499,6 +515,8 @@ int rd_kafka_metadata_cache_hint(rd_kafka_t *rk, RD_LIST_FOREACH(topic, topics, i) { rd_kafka_metadata_topic_t mtopic = {.topic = (char *)topic, .err = err}; + rd_kafka_metadata_topic_internal_t metadata_internal_topic = + RD_ZERO_INIT; /*const*/ struct rd_kafka_metadata_cache_entry *rkmce; /* !replace: Dont overwrite valid entries */ @@ -512,7 +530,8 @@ int rd_kafka_metadata_cache_hint(rd_kafka_t *rk, /* FALLTHRU */ } - rd_kafka_metadata_cache_insert(rk, &mtopic, now, ts_expires); + rd_kafka_metadata_cache_insert( + rk, &mtopic, &metadata_internal_topic, now, ts_expires); cnt++; if (dst) diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 17cb8acce2..854ab4c17f 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2085,11 +2085,10 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { - rd_kafka_op_t *rko = opaque; /* Possibly NULL */ - struct rd_kafka_metadata *md = NULL; - const rd_list_t *topics = request->rkbuf_u.Metadata.topics; + rd_kafka_op_t *rko = opaque; /* Possibly NULL */ + rd_kafka_metadata_internal_t *mdi = NULL; + const rd_list_t *topics = request->rkbuf_u.Metadata.topics; int actions; - rd_kafka_broker_id_rack_pair_t *broker_rack_pair = NULL; rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || thrd_is_current(rk->rk_thread)); @@ -2115,33 +2114,20 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_list_cnt(topics), request->rkbuf_u.Metadata.reason); - if (rko && rko->rko_replyq.q) - err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md, - &broker_rack_pair); - else - err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md, NULL); + err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &mdi); if (err) goto err; if (rko && rko->rko_replyq.q) { /* Reply to metadata requester, passing on the metadata. * Reuse requesting rko for the reply. */ - rko->rko_err = err; - rko->rko_u.metadata.md = md; - rko->rko_u.metadata.broker_rack_pair = broker_rack_pair; - if (broker_rack_pair) { - rd_assert(md); /* rd_kafka_parse_Metadata guarantees - that md will not be NULL if - broker_rack_pair isn't. */ - rko->rko_u.metadata.broker_rack_pair_cnt = - (size_t)md->broker_cnt; - } - + rko->rko_err = err; + rko->rko_u.metadata.md = &mdi->metadata; rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); rko = NULL; } else { - if (md) - rd_free(md); + if (mdi) + rd_free(mdi); } goto done; diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index af7b6362d5..9330b43cce 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -50,11 +50,11 @@ const char *rd_kafka_topic_state_names[] = {"unknown", "exists", "notexists", "error"}; -static int rd_kafka_topic_metadata_update( - rd_kafka_topic_t *rkt, - const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs, - rd_ts_t ts_age); +static int +rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_ts_t ts_age); /** @@ -479,8 +479,10 @@ rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, if (existing) *existing = 1; - rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic, NULL, - rkmce->rkmce_ts_insert); + rd_kafka_topic_metadata_update( + rkt, &rkmce->rkmce_mtopic, + &rkmce->rkmce_metadata_internal_topic, + rkmce->rkmce_ts_insert); } if (do_lock) @@ -1238,9 +1240,7 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, * @brief Update a topic from metadata. * * @param mdt Topic metadata. - * @param leader_epochs Array of per-partition leader epochs, or NULL. - * The array size is identical to the partition count in - * \p mdt. + * @param mdit Topic internal metadata. * @param ts_age absolute age (timestamp) of metadata. * @returns 1 if the number of partitions changed, 0 if not, and -1 if the * topic is unknown. @@ -1248,11 +1248,11 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, * * @locks_required rd_kafka_*lock() MUST be held. */ -static int rd_kafka_topic_metadata_update( - rd_kafka_topic_t *rkt, - const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs, - rd_ts_t ts_age) { +static int +rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_ts_t ts_age) { rd_kafka_t *rk = rkt->rkt_rk; int upd = 0; int j; @@ -1323,8 +1323,7 @@ static int rd_kafka_topic_metadata_update( for (j = 0; j < mdt->partition_cnt; j++) { int r; rd_kafka_broker_t *leader; - int32_t leader_epoch = - leader_epochs ? leader_epochs[j].leader_epoch : -1; + int32_t leader_epoch = mdit->partitions[j].leader_epoch; rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", " Topic %s partition %i Leader %" PRId32 @@ -1397,7 +1396,7 @@ static int rd_kafka_topic_metadata_update( int rd_kafka_topic_metadata_update2( rd_kafka_broker_t *rkb, const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs) { + const rd_kafka_metadata_topic_internal_t *mdit) { rd_kafka_topic_t *rkt; int r; @@ -1408,7 +1407,7 @@ int rd_kafka_topic_metadata_update2( return -1; /* Ignore topics that we dont have locally. */ } - r = rd_kafka_topic_metadata_update(rkt, mdt, leader_epochs, rd_clock()); + r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock()); rd_kafka_wrunlock(rkb->rkb_rk); @@ -1886,9 +1885,12 @@ void rd_kafka_local_topics_to_list(rd_kafka_t *rk, void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, int partition_cnt, int32_t leader_id) { - struct rd_kafka_metadata_topic mdt = {.topic = + rd_kafka_metadata_partition_internal_t *partitions = + rd_calloc(partition_cnt, sizeof(*partitions)); + struct rd_kafka_metadata_topic mdt = {.topic = (char *)rkt->rkt_topic->str, .partition_cnt = partition_cnt}; + rd_kafka_metadata_topic_internal_t mdit = {.partitions = partitions}; int i; mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt); @@ -1900,7 +1902,8 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, } rd_kafka_wrlock(rkt->rkt_rk); - rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, rd_true); - rd_kafka_topic_metadata_update(rkt, &mdt, NULL, rd_clock()); + rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true); + rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock()); rd_kafka_wrunlock(rkt->rkt_rk); + rd_free(partitions); } diff --git a/src/rdkafka_topic.h b/src/rdkafka_topic.h index cbed9308a7..bacba6e79a 100644 --- a/src/rdkafka_topic.h +++ b/src/rdkafka_topic.h @@ -258,7 +258,7 @@ rd_kafka_topic_get_error(rd_kafka_topic_t *rkt) { int rd_kafka_topic_metadata_update2( rd_kafka_broker_t *rkb, const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs); + const rd_kafka_metadata_topic_internal_t *mdit); void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now); From b0b804f11d2807274c23cf36d68b8e845f9a447b Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 12 May 2023 12:11:06 +0200 Subject: [PATCH 2/6] Remove rd_kafka_broker_id_rack_pair. Replaced by rd_kafka_metadata_internal_t --- src/rdkafka_assignor.c | 37 ++----- src/rdkafka_assignor.h | 27 +---- src/rdkafka_cgrp.c | 30 +++--- src/rdkafka_int.h | 3 +- src/rdkafka_metadata.c | 13 +-- src/rdkafka_metadata.h | 6 +- src/rdkafka_op.c | 8 +- src/rdkafka_op.h | 7 +- src/rdkafka_range_assignor.c | 3 +- src/rdkafka_request.c | 10 +- src/rdkafka_roundrobin_assignor.c | 3 +- src/rdkafka_sticky_assignor.c | 167 ++++++++++++++---------------- 12 files changed, 125 insertions(+), 189 deletions(-) diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index f9cf99303f..c5ae374e9f 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -104,22 +104,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, return 0; } -void rd_kafka_broker_rack_pair_destroy_cnt( - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t cnt) { - size_t i; - for (i = 0; i < cnt; i++) - RD_IF_FREE(broker_rack_pair[i].rack, rd_kafkap_str_destroy); - - rd_free(broker_rack_pair); -} - -int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b) { - const rd_kafka_broker_id_rack_pair_t *a = _a; - const rd_kafka_broker_id_rack_pair_t *b = _b; - return RD_CMP(a->broker_id, b->broker_id); -} - rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( const rd_list_t *topics, @@ -332,10 +316,9 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, const rd_kafka_assignor_t *rkas, rd_kafka_metadata_t *metadata, + rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, int member_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size) { rd_kafka_resp_err_t err; @@ -389,10 +372,10 @@ rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, /* Call assignors assign callback */ err = rkas->rkas_assign_cb( - rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members, - member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, - eligible_topics.rl_cnt, broker_rack_pair, broker_rack_pair_cnt, - errstr, errstr_size, rkas->rkas_opaque); + rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, + metadata_internal, members, member_cnt, + (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, + eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque); if (err) { rd_kafka_dbg( @@ -511,12 +494,11 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const struct rd_kafka_assignor_s *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, + const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque), @@ -986,10 +968,9 @@ static int ut_assignors(void) { } /* Run assignor */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, - &metadata, members, - tests[i].member_cnt, NULL, - 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run( + rk->rk_cgrp, rkas, &metadata, NULL, members, + tests[i].member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s", tests[i].name, diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index b07bda7878..3df60c4554 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -80,23 +80,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, const rd_kafka_group_member_t *rkgm, const char *topic); -/** - * Struct to hold information about the racks on which brokers are. - */ -typedef struct rd_kafka_broker_id_rack_pair { - int32_t broker_id; - rd_kafkap_str_t *rack; -} rd_kafka_broker_id_rack_pair; - -/** - * Destroys cnt broker_rack_pairs, includng the destruction of the rack. - */ -void rd_kafka_broker_rack_pair_destroy_cnt( - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t cnt); - -int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b); - /** * Structure to hold metadata for a single topic and all its * subscribing members. @@ -126,12 +109,12 @@ typedef struct rd_kafka_assignor_s { const struct rd_kafka_assignor_s *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, + /* Optional internal metadata structure */ + const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque); @@ -168,12 +151,11 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const struct rd_kafka_assignor_s *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, + const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque), @@ -217,10 +199,9 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, const rd_kafka_assignor_t *rkas, rd_kafka_metadata_t *metadata, + rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, int member_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index f49af8d765..8ba1848d55 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1659,15 +1659,12 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, /** * @brief Run group assignment. */ -static void -rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, - rd_kafka_assignor_t *rkas, - rd_kafka_resp_err_t err, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt) { +static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, + rd_kafka_assignor_t *rkas, + rd_kafka_resp_err_t err, + rd_kafka_metadata_internal_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt) { char errstr[512]; if (err) { @@ -1680,9 +1677,9 @@ rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, *errstr = '\0'; /* Run assignor */ - err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt, - broker_rack_pair, broker_rack_pair_cnt, - errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, metadata, + members, member_cnt, errstr, sizeof(errstr)); if (err) { if (!*errstr) @@ -1748,11 +1745,10 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; } - rd_kafka_cgrp_assignor_run( - rkcg, rkcg->rkcg_assignor, rko->rko_err, rko->rko_u.metadata.md, - rkcg->rkcg_group_leader.members, rkcg->rkcg_group_leader.member_cnt, - rko->rko_u.metadata.broker_rack_pair, - rko->rko_u.metadata.broker_rack_pair_cnt); + rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err, + rko->rko_u.metadata.mdi, + rkcg->rkcg_group_leader.members, + rkcg->rkcg_group_leader.member_cnt); return RD_KAFKA_OP_RES_HANDLED; } diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index fb0a0d04be..9eba279ca3 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -78,7 +78,8 @@ struct rd_kafka_topic_s; struct rd_kafka_msg_s; struct rd_kafka_broker_s; struct rd_kafka_toppar_s; - +typedef struct rd_kafka_metadata_internal_s rd_kafka_metadata_internal_t; +typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 586c19a725..dd7b937511 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -177,8 +177,9 @@ rd_kafka_metadata(rd_kafka_t *rk, /* Reply: pass metadata pointer to application who now owns it*/ rd_kafka_assert(rk, rko->rko_u.metadata.md); - *metadatap = rko->rko_u.metadata.md; - rko->rko_u.metadata.md = NULL; + *metadatap = rko->rko_u.metadata.md; + rko->rko_u.metadata.md = NULL; + rko->rko_u.metadata.mdi = NULL; rd_kafka_op_destroy(rko); return RD_KAFKA_RESP_ERR_NO_ERROR; @@ -348,13 +349,9 @@ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb) { * * @param topics are the requested topics (may be NULL) * - * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs. + * The metadata will be marshalled into 'rd_kafka_metadata_internal_t *'. * - * The marshalled metadata is returned in \p *mdp, (NULL on error). - * - * Information about the racks-per-broker is returned in \p *broker_rack_pair_p - * if it's not NULL. The count of racks-per-broker is equal to mdp->broker_cnt, - * and the pairs are sorted by broker id. + * The marshalled metadata is returned in \p *mdip, (NULL on error). * * @returns an error code on parse failure, else NO_ERRRO. * diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 90809630ca..92ac939b4e 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -46,7 +46,8 @@ typedef struct rd_kafka_metadata_partition_internal_s { */ typedef struct rd_kafka_metadata_topic_internal_s { /** Internal metadata partition structs. - * same count as metadata.topics[i].partition_cnt. */ + * same count as metadata.topics[i].partition_cnt. + * Sorted by Partition Id. */ rd_kafka_metadata_partition_internal_t *partitions; } rd_kafka_metadata_topic_internal_t; @@ -65,7 +66,8 @@ typedef struct rd_kafka_metadata_internal_s { be kept the first field so the pointer can be cast to *rd_kafka_metadata_internal_t when needed */ - /* Internal metadata brokers. Same count as metadata.broker_cnt. */ + /* Internal metadata brokers. Same count as metadata.broker_cnt. + * Sorted by broker id. */ rd_kafka_metadata_broker_internal_t *brokers; /* Internal metadata topics. Same count as metadata.topic_cnt. */ rd_kafka_metadata_topic_internal_t *topics; diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index a7d3600464..b9ee83c253 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -373,13 +373,9 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { break; case RD_KAFKA_OP_METADATA: - if (rko->rko_u.metadata.broker_rack_pair) { - rd_kafka_broker_rack_pair_destroy_cnt( - rko->rko_u.metadata.broker_rack_pair, - rko->rko_u.metadata.broker_rack_pair_cnt); - } - RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); + /* It's not needed to free metadata.mdi because they + are the in the same memory allocation. */ break; case RD_KAFKA_OP_LOG: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 3ff69f526f..d4d0736baf 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -38,7 +38,6 @@ typedef struct rd_kafka_q_s rd_kafka_q_t; typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; typedef struct rd_kafka_op_s rd_kafka_op_t; -typedef struct rd_kafka_broker_id_rack_pair rd_kafka_broker_id_rack_pair_t; /* One-off reply queue + reply version. * All APIs that take a rd_kafka_replyq_t makes a copy of the @@ -371,11 +370,7 @@ struct rd_kafka_op_s { /* RD_KAFKA_OP_METADATA */ struct { rd_kafka_metadata_t *md; - size_t broker_rack_pair_cnt; - rd_kafka_broker_id_rack_pair_t - *broker_rack_pair; /* mapping of broker id -> rack - string as seen in metadata, - sorted by broker id. */ + rd_kafka_metadata_internal_t *mdi; int force; /* force request regardless of outstanding * metadata requests. */ } metadata; diff --git a/src/rdkafka_range_assignor.c b/src/rdkafka_range_assignor.c index 43d86eecf9..8b8c595114 100644 --- a/src/rdkafka_range_assignor.c +++ b/src/rdkafka_range_assignor.c @@ -55,12 +55,11 @@ rd_kafka_resp_err_t rd_kafka_range_assignor_assign_cb( const rd_kafka_assignor_t *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, + const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque) { diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 854ab4c17f..06b2db0779 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2121,8 +2121,9 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, if (rko && rko->rko_replyq.q) { /* Reply to metadata requester, passing on the metadata. * Reuse requesting rko for the reply. */ - rko->rko_err = err; - rko->rko_u.metadata.md = &mdi->metadata; + rko->rko_err = err; + rko->rko_u.metadata.md = &mdi->metadata; + rko->rko_u.metadata.mdi = mdi; rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); rko = NULL; } else { @@ -2153,8 +2154,9 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_kafka_actions2str(actions)); /* Respond back to caller on non-retriable errors */ if (rko && rko->rko_replyq.q) { - rko->rko_err = err; - rko->rko_u.metadata.md = NULL; + rko->rko_err = err; + rko->rko_u.metadata.md = NULL; + rko->rko_u.metadata.mdi = NULL; rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); rko = NULL; } diff --git a/src/rdkafka_roundrobin_assignor.c b/src/rdkafka_roundrobin_assignor.c index 7ef6d9edd1..1834e8c4d6 100644 --- a/src/rdkafka_roundrobin_assignor.c +++ b/src/rdkafka_roundrobin_assignor.c @@ -54,12 +54,11 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( const rd_kafka_assignor_t *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, + const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque) { diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index ce37e58ce9..67cf2e48ef 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -1581,12 +1581,11 @@ rd_kafka_resp_err_t rd_kafka_sticky_assignor_assign_cb( const rd_kafka_assignor_t *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, + const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque) { @@ -2210,8 +2209,8 @@ static int ut_testOneConsumerNoTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2236,8 +2235,8 @@ static int ut_testOneConsumerNonexistentTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 0); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2263,8 +2262,8 @@ static int ut_testOneConsumerOneTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2297,8 +2296,8 @@ static int ut_testOnlyAssignsPartitionsFromSubscribedTopics( rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2326,8 +2325,8 @@ static int ut_testOneConsumerMultipleTopics(rd_kafka_t *rk, rd_kafka_metadata_new_topic_mockv(2, "topic1", 1, "topic2", 2); ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2355,8 +2354,8 @@ ut_testTwoConsumersOneTopicOnePartition(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); ut_init_member(&members[1], "consumer2", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2386,8 +2385,8 @@ ut_testTwoConsumersOneTopicTwoPartitions(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); ut_init_member(&members[1], "consumer2", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2420,8 +2419,8 @@ static int ut_testMultipleConsumersMixedTopicSubscriptions( ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); ut_init_member(&members[2], "consumer3", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2454,8 +2453,8 @@ ut_testTwoConsumersTwoTopicsSixPartitions(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2485,8 +2484,8 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, - NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[0], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2498,8 +2497,8 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, /* Add consumer2 */ ut_init_member(&members[1], "consumer2", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2512,8 +2511,8 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, /* Remove consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 1, - NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, + &members[1], 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[1], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2571,8 +2570,8 @@ ut_testPoorRoundRobinAssignmentScenario(rd_kafka_t *rk, ut_init_member(&members[3], "consumer4", "topic1", "topic2", "topic3", "topic4", "topic5", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2606,8 +2605,8 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2625,8 +2624,8 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2647,8 +2646,8 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, rd_kafka_metadata_destroy(metadata); metadata = rd_kafka_metadata_new_topic_mockv(1, "topic2", 3); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2707,9 +2706,8 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, members[i - 1].rkgm_subscription = subscription; } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2723,9 +2721,8 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, sizeof(*members) * (member_cnt - 10)); member_cnt--; - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2765,9 +2762,8 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, } member_cnt--; /* Skip one consumer */ - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2778,9 +2774,8 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, */ member_cnt++; - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2828,9 +2823,8 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, rd_kafka_topic_partition_list_copy(subscription); } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2842,9 +2836,8 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, memmove(&members[5], &members[6], sizeof(*members) * (member_cnt - 6)); member_cnt--; - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2902,9 +2895,8 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( members[i].rkgm_subscription = subscription; } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2919,9 +2911,8 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( member_cnt--; } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2965,8 +2956,8 @@ static int ut_testNewSubscription(rd_kafka_t *rk, metadata->topics[j].topic, RD_KAFKA_PARTITION_UA); } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2980,8 +2971,8 @@ static int ut_testNewSubscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_add(members[0].rkgm_subscription, "topic1", RD_KAFKA_PARTITION_UA); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3015,9 +3006,8 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, ut_init_member(&members[2], "consumer3", "topic1", NULL); ut_init_member(&members[3], "consumer4", "topic1", NULL); - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -3037,8 +3027,8 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, /* * Remove potential group leader consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], - member_cnt - 1, NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, + &members[1], member_cnt - 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3122,9 +3112,8 @@ static int ut_testStickiness(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { 0); - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); @@ -3156,8 +3145,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { ut_init_member(&members[2], "consumer3", "topic1", NULL); /* Just consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, - NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 1, metadata); @@ -3166,8 +3155,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { "topic1", 3, "topic1", 4, "topic1", 5, NULL); /* consumer1 and consumer2 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 2, - NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + 2, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 2, metadata); @@ -3180,9 +3169,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Run it twice, should be stable. */ for (i = 0; i < 2; i++) { /* consumer1, consumer2, and consumer3 */ - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - 3, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, + members, 3, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 3, metadata); @@ -3193,8 +3181,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { } /* Remove consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 2, - NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, + &members[1], 2, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], 2, metadata); @@ -3205,8 +3193,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { NULL); /* Remove consumer2 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[2], 1, - NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, + &members[2], 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[2], 1, metadata); @@ -3235,8 +3223,8 @@ ut_testAssignmentUpdatedForDeletedTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", "topic3", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3267,8 +3255,8 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( ut_init_member(&members[0], "consumer1", "topic", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3281,8 +3269,8 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( rd_kafka_metadata_destroy(metadata); metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3329,9 +3317,8 @@ ut_testConflictingPreviousAssignments(rd_kafka_t *rk, 1); - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); RD_UT_ASSERT(members[0].rkgm_assignment->cnt == 1 && From 5d117de70585af7c6412ef0f0fd2e847b9e9f4b7 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 12 May 2023 13:02:44 +0200 Subject: [PATCH 3/6] Style fix, documentation, remove internal accessors. --- src/rdkafka_aux.h | 6 ++++-- src/rdkafka_metadata.c | 48 ++---------------------------------------- 2 files changed, 6 insertions(+), 48 deletions(-) diff --git a/src/rdkafka_aux.h b/src/rdkafka_aux.h index c78f51f2cf..7d5339bd73 100644 --- a/src/rdkafka_aux.h +++ b/src/rdkafka_aux.h @@ -110,8 +110,10 @@ typedef struct rd_kafka_Node_s { char *rack_id; /*< (optional) Node rack id */ } rd_kafka_Node_t; -rd_kafka_Node_t * -rd_kafka_Node_new(int32_t id, const char *host, uint16_t port, const char *rack_id); +rd_kafka_Node_t *rd_kafka_Node_new(int32_t id, + const char *host, + uint16_t port, + const char *rack_id); rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src); diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index dd7b937511..556764f6ac 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -39,11 +39,7 @@ #include /** - * @brief TODO: write - * - * @param _a - * @param _b - * @return int + * @brief Id comparator for rd_kafka_metadata_broker_internal_t */ static int rd_kafka_metadata_broker_internal_cmp(const void *_a, const void *_b) { @@ -53,11 +49,7 @@ static int rd_kafka_metadata_broker_internal_cmp(const void *_a, } /** - * @brief TODO: write - * - * @param _a - * @param _b - * @return int + * @brief Id comparator for rd_kafka_metadata_partition_internal_t */ static int rd_kafka_metadata_partition_internal_cmp(const void *_a, const void *_b) { @@ -66,42 +58,6 @@ static int rd_kafka_metadata_partition_internal_cmp(const void *_a, return RD_CMP(a->id, b->id); } -/** - * @brief TODO: write - * - * @param metadata - * @return const rd_kafka_metadata_t* - */ -const rd_kafka_metadata_t * -rd_kafka_metadata_internal_metadata(rd_kafka_metadata_internal_t *mdi) { - return &mdi->metadata; -} - -/** - * @brief TODO: write - * - * @param metadata - * @param i - * @return const rd_kafka_metadata_topic_internal_t* - */ -const rd_kafka_metadata_topic_internal_t * -rd_kafka_metadata_internal_topic(rd_kafka_metadata_internal_t *metadata, - int i) { - return &metadata->topics[i]; -} - -/** - * @brief TODO: write - * - * @param metadata - * @param leader_epoch - */ -void rd_kafka_metadata_internal_partition_set_leader_epoch( - rd_kafka_metadata_partition_internal_t *metadata_partition, - int32_t leader_epoch) { - metadata_partition->leader_epoch = leader_epoch; -} - rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, From 7f16994696279fe44ee63151c8f82e90971556e8 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 15 May 2023 14:05:23 +0200 Subject: [PATCH 4/6] Remove internal struct from assignor cb. Add internal accessor function for that. --- src/rdkafka_assignor.c | 24 +++---- src/rdkafka_assignor.h | 19 ++--- src/rdkafka_cgrp.c | 5 +- src/rdkafka_int.h | 2 +- src/rdkafka_metadata.c | 39 ++++++++-- src/rdkafka_metadata.h | 8 +++ src/rdkafka_range_assignor.c | 25 ++++--- src/rdkafka_roundrobin_assignor.c | 1 - src/rdkafka_sticky_assignor.c | 116 +++++++++++++++--------------- 9 files changed, 130 insertions(+), 109 deletions(-) diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index c5ae374e9f..4f8d35ac64 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -312,15 +312,13 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg, } -rd_kafka_resp_err_t -rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_metadata_internal_t *metadata_internal, - rd_kafka_group_member_t *members, - int member_cnt, - char *errstr, - size_t errstr_size) { +rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + char *errstr, + size_t errstr_size) { rd_kafka_resp_err_t err; rd_ts_t ts_start = rd_clock(); int i; @@ -372,9 +370,8 @@ rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, /* Call assignors assign callback */ err = rkas->rkas_assign_cb( - rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, - metadata_internal, members, member_cnt, - (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, + rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members, + member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque); if (err) { @@ -494,7 +491,6 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const struct rd_kafka_assignor_s *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, - const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, @@ -969,7 +965,7 @@ static int ut_assignors(void) { /* Run assignor */ err = rd_kafka_assignor_run( - rk->rk_cgrp, rkas, &metadata, NULL, members, + rk->rk_cgrp, rkas, &metadata, members, tests[i].member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s", diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index 3df60c4554..12d5fc8313 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -109,8 +109,6 @@ typedef struct rd_kafka_assignor_s { const struct rd_kafka_assignor_s *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, - /* Optional internal metadata structure */ - const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, @@ -151,7 +149,6 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const struct rd_kafka_assignor_s *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, - const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, @@ -195,15 +192,13 @@ void rd_kafka_assignor_update_subscription( const rd_kafka_topic_partition_list_t *subscription); -rd_kafka_resp_err_t -rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_metadata_internal_t *metadata_internal, - rd_kafka_group_member_t *members, - int member_cnt, - char *errstr, - size_t errstr_size); +rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + char *errstr, + size_t errstr_size); rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk, const char *protocol); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 8ba1848d55..d3314e4abb 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1677,9 +1677,8 @@ static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, *errstr = '\0'; /* Run assignor */ - err = - rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, metadata, - members, member_cnt, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, members, + member_cnt, errstr, sizeof(errstr)); if (err) { if (!*errstr) diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 9eba279ca3..6da9ecd52b 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -353,7 +353,7 @@ struct rd_kafka_s { rd_kafka_metadata_internal_t *rk_full_metadata; /* Last full metadata. */ - rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ + rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */ struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ char *rk_clusterid; /* ClusterId from metadata */ diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 556764f6ac..be1e7f7629 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -242,6 +242,20 @@ rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *src_internal, } +/** + * @brief Returns the internal metadata type corresponding to the + * public one. + * + * @param md Public metadata type pointer. + * @return The internal metadata type. + */ +const rd_kafka_metadata_internal_t * +rd_kafka_metadata_get_internal(const rd_kafka_metadata_t *md) { + /* Implementation is just a cast because the public one + * is at the beginning of the internal one. */ + return (const rd_kafka_metadata_internal_t *)md; +} + /** * @brief Update topic state and information based on topic metadata. * @@ -1410,6 +1424,7 @@ void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) { rd_kafka_metadata_t * rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, size_t topic_cnt) { + rd_kafka_metadata_internal_t *mdi; rd_kafka_metadata_t *md; rd_tmpabuf_t tbuf; size_t topic_names_size = 0; @@ -1428,17 +1443,22 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, * needed by the final metadata_t object */ rd_tmpabuf_new( &tbuf, - sizeof(*md) + (sizeof(*md->topics) * topic_cnt) + topic_names_size + - (64 /*topic name size..*/ * topic_cnt) + - (sizeof(*md->topics[0].partitions) * total_partition_cnt), + sizeof(*mdi) + (sizeof(*md->topics) * topic_cnt) + + topic_names_size + (64 /*topic name size..*/ * topic_cnt) + + (sizeof(*md->topics[0].partitions) * total_partition_cnt) + + (sizeof(*mdi->topics) * topic_cnt) + + (sizeof(*mdi->topics[0].partitions) * total_partition_cnt), 1 /*assert on fail*/); - md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)); - memset(md, 0, sizeof(*md)); + mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); + memset(mdi, 0, sizeof(*mdi)); + md = &mdi->metadata; md->topic_cnt = (int)topic_cnt; md->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics)); + mdi->topics = + rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*mdi->topics)); for (i = 0; i < (size_t)md->topic_cnt; i++) { int j; @@ -1451,11 +1471,18 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, md->topics[i].partitions = rd_tmpabuf_alloc( &tbuf, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)); + mdi->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, md->topics[i].partition_cnt * + sizeof(*mdi->topics[i].partitions)); for (j = 0; j < md->topics[i].partition_cnt; j++) { memset(&md->topics[i].partitions[j], 0, sizeof(md->topics[i].partitions[j])); - md->topics[i].partitions[j].id = j; + memset(&mdi->topics[i].partitions[j], 0, + sizeof(mdi->topics[i].partitions[j])); + md->topics[i].partitions[j].id = j; + mdi->topics[i].partitions[j].id = j; + mdi->topics[i].partitions[j].leader_epoch = -1; } } diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 92ac939b4e..7d19d23148 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -52,8 +52,13 @@ typedef struct rd_kafka_metadata_topic_internal_s { } rd_kafka_metadata_topic_internal_t; +/** + * @brief Metadata broker internal container + */ typedef struct rd_kafka_metadata_broker_internal_s { + /** Broker Id. */ int32_t id; + /** Rack Id (optional). */ char *rack_id; } rd_kafka_metadata_broker_internal_t; @@ -84,6 +89,9 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_metadata_internal_t * rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *mdi, size_t size); +const rd_kafka_metadata_internal_t * +rd_kafka_metadata_get_internal(const rd_kafka_metadata_t *md); + size_t rd_kafka_metadata_topic_match(rd_kafka_t *rk, rd_list_t *tinfos, diff --git a/src/rdkafka_range_assignor.c b/src/rdkafka_range_assignor.c index 8b8c595114..c83f1f1a44 100644 --- a/src/rdkafka_range_assignor.c +++ b/src/rdkafka_range_assignor.c @@ -50,19 +50,18 @@ * C1: [t0p2, t1p2] */ -rd_kafka_resp_err_t rd_kafka_range_assignor_assign_cb( - rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - const rd_kafka_metadata_internal_t *metadata_internal, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque) { +rd_kafka_resp_err_t +rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { unsigned int ti; int i; diff --git a/src/rdkafka_roundrobin_assignor.c b/src/rdkafka_roundrobin_assignor.c index 1834e8c4d6..6cb9193645 100644 --- a/src/rdkafka_roundrobin_assignor.c +++ b/src/rdkafka_roundrobin_assignor.c @@ -54,7 +54,6 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( const rd_kafka_assignor_t *rkas, const char *member_id, const rd_kafka_metadata_t *metadata, - const rd_kafka_metadata_internal_t *metadata_internal, rd_kafka_group_member_t *members, size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index 67cf2e48ef..922cf49711 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -1576,19 +1576,18 @@ static void assignToMembers(map_str_toppar_list_t *currentAssignment, * * This code is closely mimicking the AK Java AbstractStickyAssignor.assign(). */ -rd_kafka_resp_err_t rd_kafka_sticky_assignor_assign_cb( - rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - const rd_kafka_metadata_internal_t *metadata_internal, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque) { +rd_kafka_resp_err_t +rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { /* FIXME: Let the cgrp pass the actual eligible partition count */ size_t partition_cnt = member_cnt * 10; /* FIXME */ @@ -2209,7 +2208,7 @@ static int ut_testOneConsumerNoTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2235,7 +2234,7 @@ static int ut_testOneConsumerNonexistentTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 0); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2262,7 +2261,7 @@ static int ut_testOneConsumerOneTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2296,7 +2295,7 @@ static int ut_testOnlyAssignsPartitionsFromSubscribedTopics( rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2325,7 +2324,7 @@ static int ut_testOneConsumerMultipleTopics(rd_kafka_t *rk, rd_kafka_metadata_new_topic_mockv(2, "topic1", 1, "topic2", 2); ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2354,7 +2353,7 @@ ut_testTwoConsumersOneTopicOnePartition(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); ut_init_member(&members[1], "consumer2", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2385,7 +2384,7 @@ ut_testTwoConsumersOneTopicTwoPartitions(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); ut_init_member(&members[1], "consumer2", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2419,7 +2418,7 @@ static int ut_testMultipleConsumersMixedTopicSubscriptions( ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); ut_init_member(&members[2], "consumer3", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2453,7 +2452,7 @@ ut_testTwoConsumersTwoTopicsSixPartitions(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2484,8 +2483,8 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, - 1, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[0], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2497,7 +2496,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, /* Add consumer2 */ ut_init_member(&members[1], "consumer2", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2511,8 +2510,8 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, /* Remove consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, - &members[1], 1, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 1, + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[1], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2570,7 +2569,7 @@ ut_testPoorRoundRobinAssignmentScenario(rd_kafka_t *rk, ut_init_member(&members[3], "consumer4", "topic1", "topic2", "topic3", "topic4", "topic5", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2605,7 +2604,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2624,7 +2623,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2646,7 +2645,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, rd_kafka_metadata_destroy(metadata); metadata = rd_kafka_metadata_new_topic_mockv(1, "topic2", 3); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2706,7 +2705,7 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, members[i - 1].rkgm_subscription = subscription; } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2721,7 +2720,7 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, sizeof(*members) * (member_cnt - 10)); member_cnt--; - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2762,7 +2761,7 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, } member_cnt--; /* Skip one consumer */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2774,7 +2773,7 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, */ member_cnt++; - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2823,7 +2822,7 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, rd_kafka_topic_partition_list_copy(subscription); } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2836,7 +2835,7 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, memmove(&members[5], &members[6], sizeof(*members) * (member_cnt - 6)); member_cnt--; - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2895,7 +2894,7 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( members[i].rkgm_subscription = subscription; } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2911,7 +2910,7 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( member_cnt--; } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2956,7 +2955,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, metadata->topics[j].topic, RD_KAFKA_PARTITION_UA); } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2971,7 +2970,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_add(members[0].rkgm_subscription, "topic1", RD_KAFKA_PARTITION_UA); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3006,7 +3005,7 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, ut_init_member(&members[2], "consumer3", "topic1", NULL); ut_init_member(&members[3], "consumer4", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3027,9 +3026,8 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, /* * Remove potential group leader consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, - &members[1], member_cnt - 1, errstr, - sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], + member_cnt - 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], member_cnt - 1, metadata); @@ -3112,7 +3110,7 @@ static int ut_testStickiness(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { 0); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3145,8 +3143,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { ut_init_member(&members[2], "consumer3", "topic1", NULL); /* Just consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, - 1, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 1, metadata); @@ -3155,8 +3153,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { "topic1", 3, "topic1", 4, "topic1", 5, NULL); /* consumer1 and consumer2 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, - 2, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 2, + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 2, metadata); @@ -3169,7 +3167,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Run it twice, should be stable. */ for (i = 0; i < 2; i++) { /* consumer1, consumer2, and consumer3 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 3, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3181,8 +3179,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { } /* Remove consumer1 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, - &members[1], 2, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 2, + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], 2, metadata); @@ -3193,8 +3191,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { NULL); /* Remove consumer2 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, - &members[2], 1, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[2], 1, + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[2], 1, metadata); @@ -3223,7 +3221,7 @@ ut_testAssignmentUpdatedForDeletedTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", "topic3", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3255,7 +3253,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( ut_init_member(&members[0], "consumer1", "topic", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3269,7 +3267,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( rd_kafka_metadata_destroy(metadata); metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3317,7 +3315,7 @@ ut_testConflictingPreviousAssignments(rd_kafka_t *rk, 1); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, NULL, members, + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); From 9a0c38e30e5387f809065a2d98984851161335e2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 15 May 2023 14:33:54 +0200 Subject: [PATCH 5/6] Use a define instead of a function call for rd_kafka_metadata_get_internal --- src/rdkafka_metadata.c | 15 --------------- src/rdkafka_metadata.h | 9 ++++++--- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index be1e7f7629..cb363aa23b 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -241,21 +241,6 @@ rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *src_internal, return mdi; } - -/** - * @brief Returns the internal metadata type corresponding to the - * public one. - * - * @param md Public metadata type pointer. - * @return The internal metadata type. - */ -const rd_kafka_metadata_internal_t * -rd_kafka_metadata_get_internal(const rd_kafka_metadata_t *md) { - /* Implementation is just a cast because the public one - * is at the beginning of the internal one. */ - return (const rd_kafka_metadata_internal_t *)md; -} - /** * @brief Update topic state and information based on topic metadata. * diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 7d19d23148..134f21643f 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -78,6 +78,12 @@ typedef struct rd_kafka_metadata_internal_s { rd_kafka_metadata_topic_internal_t *topics; } rd_kafka_metadata_internal_t; +/** + * @brief The internal metadata type corresponding to the + * public one. + */ +#define rd_kafka_metadata_get_internal(md) \ + (const rd_kafka_metadata_internal_t *)md rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb); @@ -89,9 +95,6 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_metadata_internal_t * rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *mdi, size_t size); -const rd_kafka_metadata_internal_t * -rd_kafka_metadata_get_internal(const rd_kafka_metadata_t *md); - size_t rd_kafka_metadata_topic_match(rd_kafka_t *rk, rd_list_t *tinfos, From c01d75755cf72941cbc88d31c693c8b787264d63 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 15 May 2023 21:04:56 +0530 Subject: [PATCH 6/6] Make macro parenthesized --- src/rdkafka_metadata.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 134f21643f..f4004f9d56 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -83,7 +83,7 @@ typedef struct rd_kafka_metadata_internal_s { * public one. */ #define rd_kafka_metadata_get_internal(md) \ - (const rd_kafka_metadata_internal_t *)md + ((const rd_kafka_metadata_internal_t *)md) rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb);