diff --git a/src/rdkafka.c b/src/rdkafka.c index 33147ccd4f..2a5e040b68 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -990,7 +990,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) { mtx_destroy(&rk->rk_init_lock); if (rk->rk_full_metadata) - rd_kafka_metadata_destroy(rk->rk_full_metadata); + rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata); rd_kafkap_str_destroy(rk->rk_client_id); rd_kafkap_str_destroy(rk->rk_group_id); rd_kafkap_str_destroy(rk->rk_eos.transactional_id); diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 6aaec636d5..2226899477 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -6349,7 +6349,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, char *errstr, size_t errstr_size) { const int log_decode_errors = LOG_ERR; - int nodeid; + int32_t nodeid; uint16_t port; int16_t api_version; int32_t cnt; diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index f9cf99303f..4f8d35ac64 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -104,22 +104,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, return 0; } -void rd_kafka_broker_rack_pair_destroy_cnt( - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t cnt) { - size_t i; - for (i = 0; i < cnt; i++) - RD_IF_FREE(broker_rack_pair[i].rack, rd_kafkap_str_destroy); - - rd_free(broker_rack_pair); -} - -int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b) { - const rd_kafka_broker_id_rack_pair_t *a = _a; - const rd_kafka_broker_id_rack_pair_t *b = _b; - return RD_CMP(a->broker_id, b->broker_id); -} - rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( const rd_list_t *topics, @@ -328,16 +312,13 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg, } -rd_kafka_resp_err_t -rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, - char *errstr, - size_t errstr_size) { +rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + char *errstr, + size_t errstr_size) { rd_kafka_resp_err_t err; rd_ts_t ts_start = rd_clock(); int i; @@ -391,8 +372,7 @@ rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, err = rkas->rkas_assign_cb( rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members, member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, - eligible_topics.rl_cnt, broker_rack_pair, broker_rack_pair_cnt, - errstr, errstr_size, rkas->rkas_opaque); + eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque); if (err) { rd_kafka_dbg( @@ -515,8 +495,6 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque), @@ -986,10 +964,9 @@ static int ut_assignors(void) { } /* Run assignor */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, - &metadata, members, - tests[i].member_cnt, NULL, - 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run( + rk->rk_cgrp, rkas, &metadata, members, + tests[i].member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s", tests[i].name, diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index b07bda7878..12d5fc8313 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -80,23 +80,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, const rd_kafka_group_member_t *rkgm, const char *topic); -/** - * Struct to hold information about the racks on which brokers are. - */ -typedef struct rd_kafka_broker_id_rack_pair { - int32_t broker_id; - rd_kafkap_str_t *rack; -} rd_kafka_broker_id_rack_pair; - -/** - * Destroys cnt broker_rack_pairs, includng the destruction of the rack. - */ -void rd_kafka_broker_rack_pair_destroy_cnt( - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t cnt); - -int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b); - /** * Structure to hold metadata for a single topic and all its * subscribing members. @@ -130,8 +113,6 @@ typedef struct rd_kafka_assignor_s { size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque); @@ -172,8 +153,6 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque), @@ -213,16 +192,13 @@ void rd_kafka_assignor_update_subscription( const rd_kafka_topic_partition_list_t *subscription); -rd_kafka_resp_err_t -rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, - char *errstr, - size_t errstr_size); +rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + char *errstr, + size_t errstr_size); rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk, const char *protocol); diff --git a/src/rdkafka_aux.c b/src/rdkafka_aux.c index 753f03d678..da565d1594 100644 --- a/src/rdkafka_aux.c +++ b/src/rdkafka_aux.c @@ -234,7 +234,7 @@ void rd_kafka_acl_result_free(void *ptr) { * @return A new allocated Node object. * Use rd_kafka_Node_destroy() to free when done. */ -rd_kafka_Node_t *rd_kafka_Node_new(int id, +rd_kafka_Node_t *rd_kafka_Node_new(int32_t id, const char *host, uint16_t port, const char *rack_id) { diff --git a/src/rdkafka_aux.h b/src/rdkafka_aux.h index ccf18e91e7..7d5339bd73 100644 --- a/src/rdkafka_aux.h +++ b/src/rdkafka_aux.h @@ -110,8 +110,10 @@ typedef struct rd_kafka_Node_s { char *rack_id; /*< (optional) Node rack id */ } rd_kafka_Node_t; -rd_kafka_Node_t * -rd_kafka_Node_new(int id, const char *host, uint16_t port, const char *rack_id); +rd_kafka_Node_t *rd_kafka_Node_new(int32_t id, + const char *host, + uint16_t port, + const char *rack_id); rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src); diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index b4f606317b..f8d9c2e23d 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -682,6 +682,10 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */ size_t _slen; \ char *_dst; \ rd_kafka_buf_read_str(rkbuf, &_kstr); \ + if (RD_KAFKAP_STR_IS_NULL(&_kstr)) { \ + dst = NULL; \ + break; \ + } \ _slen = RD_KAFKAP_STR_LEN(&_kstr); \ if (!(_dst = rd_tmpabuf_write(tmpabuf, _kstr.str, _slen + 1))) \ rd_kafka_buf_parse_fail( \ diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index f49af8d765..d3314e4abb 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1659,15 +1659,12 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, /** * @brief Run group assignment. */ -static void -rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, - rd_kafka_assignor_t *rkas, - rd_kafka_resp_err_t err, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt) { +static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, + rd_kafka_assignor_t *rkas, + rd_kafka_resp_err_t err, + rd_kafka_metadata_internal_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt) { char errstr[512]; if (err) { @@ -1680,9 +1677,8 @@ rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, *errstr = '\0'; /* Run assignor */ - err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt, - broker_rack_pair, broker_rack_pair_cnt, - errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, members, + member_cnt, errstr, sizeof(errstr)); if (err) { if (!*errstr) @@ -1748,11 +1744,10 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; } - rd_kafka_cgrp_assignor_run( - rkcg, rkcg->rkcg_assignor, rko->rko_err, rko->rko_u.metadata.md, - rkcg->rkcg_group_leader.members, rkcg->rkcg_group_leader.member_cnt, - rko->rko_u.metadata.broker_rack_pair, - rko->rko_u.metadata.broker_rack_pair_cnt); + rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err, + rko->rko_u.metadata.mdi, + rkcg->rkcg_group_leader.members, + rkcg->rkcg_group_leader.member_cnt); return RD_KAFKA_OP_RES_HANDLED; } diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 584ff3c965..6da9ecd52b 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -78,7 +78,8 @@ struct rd_kafka_topic_s; struct rd_kafka_msg_s; struct rd_kafka_broker_s; struct rd_kafka_toppar_s; - +typedef struct rd_kafka_metadata_internal_s rd_kafka_metadata_internal_t; +typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; @@ -350,8 +351,9 @@ struct rd_kafka_s { rd_ts_t rk_ts_metadata; /* Timestamp of most recent * metadata. */ - struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */ - rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */ + rd_kafka_metadata_internal_t + *rk_full_metadata; /* Last full metadata. */ + rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */ struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */ char *rk_clusterid; /* ClusterId from metadata */ diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index b9cc333b9d..cb363aa23b 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -38,6 +38,26 @@ #include #include +/** + * @brief Id comparator for rd_kafka_metadata_broker_internal_t + */ +static int rd_kafka_metadata_broker_internal_cmp(const void *_a, + const void *_b) { + const rd_kafka_metadata_broker_internal_t *a = _a; + const rd_kafka_metadata_broker_internal_t *b = _b; + return RD_CMP(a->id, b->id); +} + +/** + * @brief Id comparator for rd_kafka_metadata_partition_internal_t + */ +static int rd_kafka_metadata_partition_internal_cmp(const void *_a, + const void *_b) { + const rd_kafka_metadata_partition_internal_t *a = _a; + const rd_kafka_metadata_partition_internal_t *b = _b; + return RD_CMP(a->id, b->id); +} + rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, @@ -113,8 +133,9 @@ rd_kafka_metadata(rd_kafka_t *rk, /* Reply: pass metadata pointer to application who now owns it*/ rd_kafka_assert(rk, rko->rko_u.metadata.md); - *metadatap = rko->rko_u.metadata.md; - rko->rko_u.metadata.md = NULL; + *metadatap = rko->rko_u.metadata.md; + rko->rko_u.metadata.md = NULL; + rko->rko_u.metadata.mdi = NULL; rd_kafka_op_destroy(rko); return RD_KAFKA_RESP_ERR_NO_ERROR; @@ -130,9 +151,12 @@ void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata) { /** * @returns a newly allocated copy of metadata \p src of size \p size */ -struct rd_kafka_metadata * -rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { +rd_kafka_metadata_internal_t * +rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *src_internal, + size_t size) { struct rd_kafka_metadata *md; + rd_kafka_metadata_internal_t *mdi; + const struct rd_kafka_metadata *src = &src_internal->metadata; rd_tmpabuf_t tbuf; int i; @@ -143,23 +167,37 @@ rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { * any pointer fields needs to be copied explicitly to update * the pointer address. */ rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/); - md = rd_tmpabuf_write(&tbuf, src, sizeof(*md)); + mdi = rd_tmpabuf_write(&tbuf, src, sizeof(*mdi)); + md = &mdi->metadata; rd_tmpabuf_write_str(&tbuf, src->orig_broker_name); /* Copy Brokers */ md->brokers = rd_tmpabuf_write(&tbuf, src->brokers, - md->broker_cnt * sizeof(*md->brokers)); + src->broker_cnt * sizeof(*src->brokers)); + /* Copy internal Brokers */ + mdi->brokers = + rd_tmpabuf_write(&tbuf, src_internal->brokers, + src->broker_cnt * sizeof(*src_internal->brokers)); - for (i = 0; i < md->broker_cnt; i++) + for (i = 0; i < md->broker_cnt; i++) { md->brokers[i].host = rd_tmpabuf_write_str(&tbuf, src->brokers[i].host); + if (src_internal->brokers[i].rack_id) { + mdi->brokers[i].rack_id = rd_tmpabuf_write_str( + &tbuf, src_internal->brokers[i].rack_id); + } + } /* Copy TopicMetadata */ md->topics = rd_tmpabuf_write(&tbuf, src->topics, md->topic_cnt * sizeof(*md->topics)); + /* Copy internal TopicMetadata */ + mdi->topics = + rd_tmpabuf_write(&tbuf, src_internal->topics, + md->topic_cnt * sizeof(*src_internal->topics)); for (i = 0; i < md->topic_cnt; i++) { int j; @@ -173,6 +211,11 @@ rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { rd_tmpabuf_write(&tbuf, src->topics[i].partitions, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)); + /* Copy internal partitions */ + mdi->topics[i].partitions = rd_tmpabuf_write( + &tbuf, src_internal->topics[i].partitions, + md->topics[i].partition_cnt * + sizeof(*src_internal->topics[i].partitions)); for (j = 0; j < md->topics[i].partition_cnt; j++) { /* Copy replicas and ISRs */ @@ -195,27 +238,14 @@ rd_kafka_metadata_copy(const struct rd_kafka_metadata *src, size_t size) { /* Delibarely not destroying the tmpabuf since we return * its allocated memory. */ - return md; + return mdi; } - - -/** - * @brief Partition (id) comparator for partition_id_leader_epoch struct. - */ -static int rd_kafka_metadata_partition_leader_epoch_cmp(const void *_a, - const void *_b) { - const rd_kafka_partition_leader_epoch_t *a = _a, *b = _b; - return RD_CMP(a->partition_id, b->partition_id); -} - - - /** * @brief Update topic state and information based on topic metadata. * * @param mdt Topic metadata. - * @param leader_epochs Per-partition leader epoch array, or NULL if not known. + * @param mdit Topic internal metadata. * * @locality rdkafka main thread * @locks_acquired rd_kafka_wrlock(rk) @@ -223,7 +253,7 @@ static int rd_kafka_metadata_partition_leader_epoch_cmp(const void *_a, static void rd_kafka_parse_Metadata_update_topic( rd_kafka_broker_t *rkb, const rd_kafka_metadata_topic_t *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs) { + const rd_kafka_metadata_topic_internal_t *mdit) { rd_rkb_dbg(rkb, METADATA, "METADATA", /* The indent below is intentional */ @@ -244,7 +274,7 @@ static void rd_kafka_parse_Metadata_update_topic( } else { /* Update local topic & partition state based * on metadata */ - rd_kafka_topic_metadata_update2(rkb, mdt, leader_epochs); + rd_kafka_topic_metadata_update2(rkb, mdt, mdit); } } @@ -274,13 +304,9 @@ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb) { * * @param topics are the requested topics (may be NULL) * - * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs. - * - * The marshalled metadata is returned in \p *mdp, (NULL on error). + * The metadata will be marshalled into 'rd_kafka_metadata_internal_t *'. * - * Information about the racks-per-broker is returned in \p *broker_rack_pair_p - * if it's not NULL. The count of racks-per-broker is equal to mdp->broker_cnt, - * and the pairs are sorted by broker id. + * The marshalled metadata is returned in \p *mdip, (NULL on error). * * @returns an error code on parse failure, else NO_ERRRO. * @@ -290,12 +316,12 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf, - struct rd_kafka_metadata **mdp, - rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p) { + rd_kafka_metadata_internal_t **mdip) { rd_kafka_t *rk = rkb->rkb_rk; int i, j, k; rd_tmpabuf_t tbuf; - struct rd_kafka_metadata *md = NULL; + rd_kafka_metadata_internal_t *mdi = NULL; + rd_kafka_metadata_t *md = NULL; size_t rkb_namelen; const int log_decode_errors = LOG_ERR; rd_list_t *missing_topics = NULL; @@ -303,6 +329,8 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_bool_t all_topics = request->rkbuf_u.Metadata.all_topics; rd_bool_t cgrp_update = request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp; + rd_bool_t has_reliable_leader_epochs = + rd_kafka_has_reliable_leader_epochs(rkb); const char *reason = request->rkbuf_u.Metadata.reason ? request->rkbuf_u.Metadata.reason : "(no reason)"; @@ -312,12 +340,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int broker_changes = 0; int cache_changes = 0; - /** This array is reused and resized as necessary to hold per-partition - * leader epochs (ApiVersion >= 7). */ - rd_kafka_partition_leader_epoch_t *leader_epochs = NULL; - /** Number of allocated elements in leader_epochs. */ - size_t leader_epochs_size = 0; - rd_ts_t ts_start = rd_clock(); + rd_ts_t ts_start = rd_clock(); /* Ignore metadata updates when terminating */ if (rd_kafka_terminating(rkb->rkb_rk)) { @@ -340,12 +363,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4), 0 /*dont assert on fail*/); - if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) { + if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)))) { rd_kafka_broker_unlock(rkb); err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; goto err; } + md = &mdi->metadata; md->orig_broker_id = rkb->rkb_nodeid; md->orig_broker_name = rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen); @@ -364,10 +388,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, "%d brokers: tmpabuf memory shortage", md->broker_cnt); - if (ApiVersion >= 1 && broker_rack_pair_p) { - *broker_rack_pair_p = rd_malloc( - sizeof(rd_kafka_broker_id_rack_pair_t) * md->broker_cnt); - } + if (!(mdi->brokers = rd_tmpabuf_alloc( + &tbuf, md->broker_cnt * sizeof(*mdi->brokers)))) + rd_kafka_buf_parse_fail( + rkbuf, "%d internal brokers: tmpabuf memory shortage", + md->broker_cnt); for (i = 0; i < md->broker_cnt; i++) { rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id); @@ -375,15 +400,12 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, md->brokers[i].host); rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].port); + mdi->brokers[i].id = md->brokers[i].id; if (ApiVersion >= 1) { - rd_kafkap_str_t rack; - rd_kafka_buf_read_str(rkbuf, &rack); - if (broker_rack_pair_p) { - (*broker_rack_pair_p)[i].broker_id = - md->brokers[i].id; - (*broker_rack_pair_p)[i].rack = - rd_kafkap_str_copy(&rack); - } + rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, + mdi->brokers[i].rack_id); + } else { + mdi->brokers[i].rack_id = NULL; } rd_kafka_buf_skip_tags(rkbuf); @@ -399,10 +421,8 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, RD_KAFKAP_STR_PR(&cluster_id), controller_id); } - if (broker_rack_pair_p) - qsort(*broker_rack_pair_p, md->broker_cnt, - sizeof(rd_kafka_broker_id_rack_pair_t), - rd_kafka_broker_id_rack_pair_cmp); + qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[i]), + rd_kafka_metadata_broker_internal_cmp); /* Read TopicMetadata */ rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX); @@ -414,6 +434,12 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_buf_parse_fail( rkbuf, "%d topics: tmpabuf memory shortage", md->topic_cnt); + if (!(mdi->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt * + sizeof(*mdi->topics)))) + rd_kafka_buf_parse_fail( + rkbuf, "%d internal topics: tmpabuf memory shortage", + md->topic_cnt); + for (i = 0; i < md->topic_cnt; i++) { rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err); rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, @@ -436,16 +462,15 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, md->topics[i].topic, md->topics[i].partition_cnt); - /* Resize reused leader_epochs array to fit this partition's - * leader epochs. */ - if (ApiVersion >= 7 && md->topics[i].partition_cnt > 0 && - (size_t)md->topics[i].partition_cnt > leader_epochs_size) { - leader_epochs_size = - RD_MAX(32, md->topics[i].partition_cnt); - leader_epochs = - rd_realloc(leader_epochs, sizeof(*leader_epochs) * - leader_epochs_size); - } + if (!(mdi->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, md->topics[i].partition_cnt * + sizeof(*mdi->topics[i].partitions)))) + rd_kafka_buf_parse_fail(rkbuf, + "%s: %d internal partitions: " + "tmpabuf memory shortage", + md->topics[i].topic, + md->topics[i].partition_cnt); + for (j = 0; j < md->topics[i].partition_cnt; j++) { rd_kafka_buf_read_i16a(rkbuf, @@ -454,11 +479,19 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, md->topics[i].partitions[j].id); rd_kafka_buf_read_i32a( rkbuf, md->topics[i].partitions[j].leader); + + mdi->topics[i].partitions[j].id = + md->topics[i].partitions[j].id; if (ApiVersion >= 7) { - leader_epochs[j].partition_id = - md->topics[i].partitions[j].id; rd_kafka_buf_read_i32( - rkbuf, &leader_epochs[j].leader_epoch); + rkbuf, + &mdi->topics[i].partitions[j].leader_epoch); + if (!has_reliable_leader_epochs) + mdi->topics[i] + .partitions[j] + .leader_epoch = -1; + } else { + mdi->topics[i].partitions[j].leader_epoch = -1; } /* Replicas */ @@ -552,37 +585,17 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, continue; } - if (leader_epochs_size > 0 && - !rd_kafka_has_reliable_leader_epochs(rkb)) { - /* Prior to Kafka version 2.4 (which coincides with - * Metadata version 9), the broker does not propagate - * leader epoch information accurately while a - * reassignment is in progress. Relying on a stale - * epoch can lead to FENCED_LEADER_EPOCH errors which - * can prevent consumption throughout the course of - * a reassignment. It is safer in this case to revert - * to the behavior in previous protocol versions - * which checks leader status only. */ - leader_epochs_size = 0; - rd_free(leader_epochs); - leader_epochs = NULL; - } - - /* Sort partitions by partition id */ qsort(md->topics[i].partitions, md->topics[i].partition_cnt, sizeof(*md->topics[i].partitions), rd_kafka_metadata_partition_id_cmp); - if (leader_epochs_size > 0) { - /* And sort leader_epochs by partition id */ - qsort(leader_epochs, md->topics[i].partition_cnt, - sizeof(*leader_epochs), - rd_kafka_metadata_partition_leader_epoch_cmp); - } + qsort(mdi->topics[i].partitions, md->topics[i].partition_cnt, + sizeof(*mdi->topics[i].partitions), + rd_kafka_metadata_partition_internal_cmp); /* Update topic state based on the topic metadata */ rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i], - leader_epochs); + &mdi->topics[i]); if (requested_topics) { @@ -596,7 +609,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, rd_kafka_wrlock(rk); rd_kafka_metadata_cache_topic_update( - rk, &md->topics[i], + rk, &md->topics[i], &mdi->topics[i], rd_false /*propagate later*/); cache_changes++; rd_kafka_wrunlock(rk); @@ -710,9 +723,9 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (rkb->rkb_rk->rk_full_metadata) rd_kafka_metadata_destroy( - rkb->rkb_rk->rk_full_metadata); + &rkb->rkb_rk->rk_full_metadata->metadata); rkb->rkb_rk->rk_full_metadata = - rd_kafka_metadata_copy(md, tbuf.of); + rd_kafka_metadata_copy(mdi, tbuf.of); rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata; rd_rkb_dbg(rkb, METADATA, "METADATA", "Caching full metadata with " @@ -758,16 +771,13 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (missing_topics) rd_list_destroy(missing_topics); - if (leader_epochs) - rd_free(leader_epochs); - /* This metadata request was triggered by someone wanting * the metadata information back as a reply, so send that reply now. * In this case we must not rd_free the metadata memory here, * the requestee will do. * The tbuf is explicitly not destroyed as we return its memory * to the caller. */ - *mdp = md; + *mdip = mdi; return RD_KAFKA_RESP_ERR_NO_ERROR; @@ -784,19 +794,6 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (missing_topics) rd_list_destroy(missing_topics); - - if (leader_epochs) - rd_free(leader_epochs); - - if (broker_rack_pair_p && *broker_rack_pair_p) { - /* md must be allocated if *broker_rack_pair_p != NULL - since md->brokers_cnt is used to allocate it */ - rd_assert(md); - rd_kafka_broker_rack_pair_destroy_cnt(*broker_rack_pair_p, - md->broker_cnt); - *broker_rack_pair_p = NULL; - } - rd_tmpabuf_destroy(&tbuf); return err; @@ -824,12 +821,15 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *errored) { int ti, i; size_t cnt = 0; - const struct rd_kafka_metadata *metadata; + rd_kafka_metadata_internal_t *mdi; + struct rd_kafka_metadata *metadata; rd_kafka_topic_partition_list_t *unmatched; rd_kafka_rdlock(rk); - metadata = rk->rk_full_metadata; - if (!metadata) { + mdi = rk->rk_full_metadata; + metadata = &mdi->metadata; + + if (!mdi) { rd_kafka_rdunlock(rk); return 0; } @@ -1409,6 +1409,7 @@ void rd_kafka_metadata_fast_leader_query(rd_kafka_t *rk) { rd_kafka_metadata_t * rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, size_t topic_cnt) { + rd_kafka_metadata_internal_t *mdi; rd_kafka_metadata_t *md; rd_tmpabuf_t tbuf; size_t topic_names_size = 0; @@ -1427,17 +1428,22 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, * needed by the final metadata_t object */ rd_tmpabuf_new( &tbuf, - sizeof(*md) + (sizeof(*md->topics) * topic_cnt) + topic_names_size + - (64 /*topic name size..*/ * topic_cnt) + - (sizeof(*md->topics[0].partitions) * total_partition_cnt), + sizeof(*mdi) + (sizeof(*md->topics) * topic_cnt) + + topic_names_size + (64 /*topic name size..*/ * topic_cnt) + + (sizeof(*md->topics[0].partitions) * total_partition_cnt) + + (sizeof(*mdi->topics) * topic_cnt) + + (sizeof(*mdi->topics[0].partitions) * total_partition_cnt), 1 /*assert on fail*/); - md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)); - memset(md, 0, sizeof(*md)); + mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)); + memset(mdi, 0, sizeof(*mdi)); + md = &mdi->metadata; md->topic_cnt = (int)topic_cnt; md->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics)); + mdi->topics = + rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*mdi->topics)); for (i = 0; i < (size_t)md->topic_cnt; i++) { int j; @@ -1450,11 +1456,18 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, md->topics[i].partitions = rd_tmpabuf_alloc( &tbuf, md->topics[i].partition_cnt * sizeof(*md->topics[i].partitions)); + mdi->topics[i].partitions = rd_tmpabuf_alloc( + &tbuf, md->topics[i].partition_cnt * + sizeof(*mdi->topics[i].partitions)); for (j = 0; j < md->topics[i].partition_cnt; j++) { memset(&md->topics[i].partitions[j], 0, sizeof(md->topics[i].partitions[j])); - md->topics[i].partitions[j].id = j; + memset(&mdi->topics[i].partitions[j], 0, + sizeof(mdi->topics[i].partitions[j])); + md->topics[i].partitions[j].id = j; + mdi->topics[i].partitions[j].id = j; + mdi->topics[i].partitions[j].leader_epoch = -1; } } diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 537a852354..f4004f9d56 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -31,17 +31,69 @@ #include "rdavl.h" +/** + * @brief Metadata partition internal container + */ +typedef struct rd_kafka_metadata_partition_internal_s { + /** Partition Id */ + int32_t id; + /** Partition leader epoch */ + int32_t leader_epoch; +} rd_kafka_metadata_partition_internal_t; + +/** + * @brief Metadata topic internal container + */ +typedef struct rd_kafka_metadata_topic_internal_s { + /** Internal metadata partition structs. + * same count as metadata.topics[i].partition_cnt. + * Sorted by Partition Id. */ + rd_kafka_metadata_partition_internal_t *partitions; +} rd_kafka_metadata_topic_internal_t; + + +/** + * @brief Metadata broker internal container + */ +typedef struct rd_kafka_metadata_broker_internal_s { + /** Broker Id. */ + int32_t id; + /** Rack Id (optional). */ + char *rack_id; +} rd_kafka_metadata_broker_internal_t; + +/** + * @brief Metadata internal container + */ +typedef struct rd_kafka_metadata_internal_s { + rd_kafka_metadata_t + metadata; /**< Public metadata struct. Must + be kept the first field so the pointer + can be cast to *rd_kafka_metadata_internal_t + when needed */ + /* Internal metadata brokers. Same count as metadata.broker_cnt. + * Sorted by broker id. */ + rd_kafka_metadata_broker_internal_t *brokers; + /* Internal metadata topics. Same count as metadata.topic_cnt. */ + rd_kafka_metadata_topic_internal_t *topics; +} rd_kafka_metadata_internal_t; + +/** + * @brief The internal metadata type corresponding to the + * public one. + */ +#define rd_kafka_metadata_get_internal(md) \ + ((const rd_kafka_metadata_internal_t *)md) + rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb); -rd_kafka_resp_err_t -rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *request, - rd_kafka_buf_t *rkbuf, - struct rd_kafka_metadata **mdp, - rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p); +rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *request, + rd_kafka_buf_t *rkbuf, + rd_kafka_metadata_internal_t **mdp); -struct rd_kafka_metadata * -rd_kafka_metadata_copy(const struct rd_kafka_metadata *md, size_t size); +rd_kafka_metadata_internal_t * +rd_kafka_metadata_copy(const rd_kafka_metadata_internal_t *mdi, size_t size); size_t rd_kafka_metadata_topic_match(rd_kafka_t *rk, @@ -102,7 +154,6 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics, size_t topic_cnt); rd_kafka_metadata_t *rd_kafka_metadata_new_topic_mockv(size_t topic_cnt, ...); - /** * @{ * @@ -117,6 +168,8 @@ struct rd_kafka_metadata_cache_entry { /** Last known leader epochs array (same size as the partition count), * or NULL if not known. */ rd_kafka_metadata_topic_t rkmce_mtopic; /* Cached topic metadata */ + /* Cached internal topic metadata */ + rd_kafka_metadata_topic_internal_t rkmce_metadata_internal_topic; /* rkmce_topics.partitions memory points here. */ }; @@ -159,11 +212,13 @@ struct rd_kafka_metadata_cache { void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk); int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts); -void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mdt, - rd_bool_t propagate); +void rd_kafka_metadata_cache_topic_update( + rd_kafka_t *rk, + const rd_kafka_metadata_topic_t *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_bool_t propagate); void rd_kafka_metadata_cache_update(rd_kafka_t *rk, - const rd_kafka_metadata_t *md, + const rd_kafka_metadata_internal_t *mdi, int abs_update); void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk); struct rd_kafka_metadata_cache_entry * diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index 514d391a83..e82f890d6d 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -238,11 +238,12 @@ int rd_kafka_metadata_partition_id_cmp(const void *_a, const void *_b) { * * @locks_required rd_kafka_wrlock() */ -static struct rd_kafka_metadata_cache_entry * -rd_kafka_metadata_cache_insert(rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mtopic, - rd_ts_t now, - rd_ts_t ts_expires) { +static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert( + rd_kafka_t *rk, + const rd_kafka_metadata_topic_t *mtopic, + const rd_kafka_metadata_topic_internal_t *metadata_internal_topic, + rd_ts_t now, + rd_ts_t ts_expires) { struct rd_kafka_metadata_cache_entry *rkmce, *old; size_t topic_len; rd_tmpabuf_t tbuf; @@ -255,17 +256,21 @@ rd_kafka_metadata_cache_insert(rd_kafka_t *rk, * any pointer fields needs to be copied explicitly to update * the pointer address. */ topic_len = strlen(mtopic->topic) + 1; - rd_tmpabuf_new(&tbuf, - RD_ROUNDUP(sizeof(*rkmce), 8) + - RD_ROUNDUP(topic_len, 8) + - (mtopic->partition_cnt * - RD_ROUNDUP(sizeof(*mtopic->partitions), 8)), - 1 /*assert on fail*/); + rd_tmpabuf_new( + &tbuf, + RD_ROUNDUP(sizeof(*rkmce), 8) + RD_ROUNDUP(topic_len, 8) + + (mtopic->partition_cnt * + RD_ROUNDUP(sizeof(*mtopic->partitions), 8)) + + (mtopic->partition_cnt * + RD_ROUNDUP(sizeof(*metadata_internal_topic->partitions), 8)), + 1 /*assert on fail*/); rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce)); rkmce->rkmce_mtopic = *mtopic; + rkmce->rkmce_metadata_internal_topic = *metadata_internal_topic; + /* Copy topic name and update pointer */ rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic); @@ -274,6 +279,12 @@ rd_kafka_metadata_cache_insert(rd_kafka_t *rk, &tbuf, mtopic->partitions, mtopic->partition_cnt * sizeof(*mtopic->partitions)); + /* Copy partition array (internal) and update pointer */ + rkmce->rkmce_metadata_internal_topic.partitions = + rd_tmpabuf_write(&tbuf, metadata_internal_topic->partitions, + mtopic->partition_cnt * + sizeof(*metadata_internal_topic->partitions)); + /* Clear uncached fields. */ for (i = 0; i < mtopic->partition_cnt; i++) { rkmce->rkmce_mtopic.partitions[i].replicas = NULL; @@ -287,6 +298,8 @@ rd_kafka_metadata_cache_insert(rd_kafka_t *rk, sizeof(*rkmce->rkmce_mtopic.partitions), rd_kafka_metadata_partition_id_cmp); + /* partitions (internal) are already sorted. */ + TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); rk->rk_metadata_cache.rkmc_cnt++; @@ -365,9 +378,11 @@ void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk) { * * @locks rd_kafka_wrlock() */ -void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, - const rd_kafka_metadata_topic_t *mdt, - rd_bool_t propagate) { +void rd_kafka_metadata_cache_topic_update( + rd_kafka_t *rk, + const rd_kafka_metadata_topic_t *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_bool_t propagate) { rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int changed = 1; @@ -380,7 +395,7 @@ void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, if (!mdt->err || mdt->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED || mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) - rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires); + rd_kafka_metadata_cache_insert(rk, mdt, mdit, now, ts_expires); else changed = rd_kafka_metadata_cache_delete_by_name(rk, mdt->topic); @@ -398,23 +413,24 @@ void rd_kafka_metadata_cache_topic_update(rd_kafka_t *rk, * @locks rd_kafka_wrlock() */ void rd_kafka_metadata_cache_update(rd_kafka_t *rk, - const rd_kafka_metadata_t *md, + const rd_kafka_metadata_internal_t *mdi, int abs_update) { struct rd_kafka_metadata_cache_entry *rkmce; rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int i; - rd_kafka_dbg(rk, METADATA, "METADATA", - "%s of metadata cache with %d topic(s)", - abs_update ? "Absolute update" : "Update", md->topic_cnt); + rd_kafka_dbg( + rk, METADATA, "METADATA", "%s of metadata cache with %d topic(s)", + abs_update ? "Absolute update" : "Update", mdi->metadata.topic_cnt); if (abs_update) rd_kafka_metadata_cache_purge(rk, rd_false /*not observers*/); - for (i = 0; i < md->topic_cnt; i++) - rd_kafka_metadata_cache_insert(rk, &md->topics[i], now, + for (i = 0; i < mdi->metadata.topic_cnt; i++) + rd_kafka_metadata_cache_insert(rk, &mdi->metadata.topics[i], + &mdi->topics[i], now, ts_expires); /* Update expiry timer */ @@ -424,7 +440,7 @@ void rd_kafka_metadata_cache_update(rd_kafka_t *rk, rkmce->rkmce_ts_expires - now, rd_kafka_metadata_cache_evict_tmr_cb, rk); - if (md->topic_cnt > 0 || abs_update) + if (mdi->metadata.topic_cnt > 0 || abs_update) rd_kafka_metadata_cache_propagate_changes(rk); } @@ -499,6 +515,8 @@ int rd_kafka_metadata_cache_hint(rd_kafka_t *rk, RD_LIST_FOREACH(topic, topics, i) { rd_kafka_metadata_topic_t mtopic = {.topic = (char *)topic, .err = err}; + rd_kafka_metadata_topic_internal_t metadata_internal_topic = + RD_ZERO_INIT; /*const*/ struct rd_kafka_metadata_cache_entry *rkmce; /* !replace: Dont overwrite valid entries */ @@ -512,7 +530,8 @@ int rd_kafka_metadata_cache_hint(rd_kafka_t *rk, /* FALLTHRU */ } - rd_kafka_metadata_cache_insert(rk, &mtopic, now, ts_expires); + rd_kafka_metadata_cache_insert( + rk, &mtopic, &metadata_internal_topic, now, ts_expires); cnt++; if (dst) diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index a7d3600464..b9ee83c253 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -373,13 +373,9 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { break; case RD_KAFKA_OP_METADATA: - if (rko->rko_u.metadata.broker_rack_pair) { - rd_kafka_broker_rack_pair_destroy_cnt( - rko->rko_u.metadata.broker_rack_pair, - rko->rko_u.metadata.broker_rack_pair_cnt); - } - RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); + /* It's not needed to free metadata.mdi because they + are the in the same memory allocation. */ break; case RD_KAFKA_OP_LOG: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 3ff69f526f..d4d0736baf 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -38,7 +38,6 @@ typedef struct rd_kafka_q_s rd_kafka_q_t; typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; typedef struct rd_kafka_op_s rd_kafka_op_t; -typedef struct rd_kafka_broker_id_rack_pair rd_kafka_broker_id_rack_pair_t; /* One-off reply queue + reply version. * All APIs that take a rd_kafka_replyq_t makes a copy of the @@ -371,11 +370,7 @@ struct rd_kafka_op_s { /* RD_KAFKA_OP_METADATA */ struct { rd_kafka_metadata_t *md; - size_t broker_rack_pair_cnt; - rd_kafka_broker_id_rack_pair_t - *broker_rack_pair; /* mapping of broker id -> rack - string as seen in metadata, - sorted by broker id. */ + rd_kafka_metadata_internal_t *mdi; int force; /* force request regardless of outstanding * metadata requests. */ } metadata; diff --git a/src/rdkafka_range_assignor.c b/src/rdkafka_range_assignor.c index 43d86eecf9..c83f1f1a44 100644 --- a/src/rdkafka_range_assignor.c +++ b/src/rdkafka_range_assignor.c @@ -50,20 +50,18 @@ * C1: [t0p2, t1p2] */ -rd_kafka_resp_err_t rd_kafka_range_assignor_assign_cb( - rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, - char *errstr, - size_t errstr_size, - void *opaque) { +rd_kafka_resp_err_t +rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { unsigned int ti; int i; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 17cb8acce2..06b2db0779 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2085,11 +2085,10 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { - rd_kafka_op_t *rko = opaque; /* Possibly NULL */ - struct rd_kafka_metadata *md = NULL; - const rd_list_t *topics = request->rkbuf_u.Metadata.topics; + rd_kafka_op_t *rko = opaque; /* Possibly NULL */ + rd_kafka_metadata_internal_t *mdi = NULL; + const rd_list_t *topics = request->rkbuf_u.Metadata.topics; int actions; - rd_kafka_broker_id_rack_pair_t *broker_rack_pair = NULL; rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || thrd_is_current(rk->rk_thread)); @@ -2115,33 +2114,21 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_list_cnt(topics), request->rkbuf_u.Metadata.reason); - if (rko && rko->rko_replyq.q) - err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md, - &broker_rack_pair); - else - err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md, NULL); + err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &mdi); if (err) goto err; if (rko && rko->rko_replyq.q) { /* Reply to metadata requester, passing on the metadata. * Reuse requesting rko for the reply. */ - rko->rko_err = err; - rko->rko_u.metadata.md = md; - rko->rko_u.metadata.broker_rack_pair = broker_rack_pair; - if (broker_rack_pair) { - rd_assert(md); /* rd_kafka_parse_Metadata guarantees - that md will not be NULL if - broker_rack_pair isn't. */ - rko->rko_u.metadata.broker_rack_pair_cnt = - (size_t)md->broker_cnt; - } - + rko->rko_err = err; + rko->rko_u.metadata.md = &mdi->metadata; + rko->rko_u.metadata.mdi = mdi; rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); rko = NULL; } else { - if (md) - rd_free(md); + if (mdi) + rd_free(mdi); } goto done; @@ -2167,8 +2154,9 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_kafka_actions2str(actions)); /* Respond back to caller on non-retriable errors */ if (rko && rko->rko_replyq.q) { - rko->rko_err = err; - rko->rko_u.metadata.md = NULL; + rko->rko_err = err; + rko->rko_u.metadata.md = NULL; + rko->rko_u.metadata.mdi = NULL; rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); rko = NULL; } diff --git a/src/rdkafka_roundrobin_assignor.c b/src/rdkafka_roundrobin_assignor.c index 7ef6d9edd1..6cb9193645 100644 --- a/src/rdkafka_roundrobin_assignor.c +++ b/src/rdkafka_roundrobin_assignor.c @@ -58,8 +58,6 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque) { diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index ce37e58ce9..922cf49711 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -1576,20 +1576,18 @@ static void assignToMembers(map_str_toppar_list_t *currentAssignment, * * This code is closely mimicking the AK Java AbstractStickyAssignor.assign(). */ -rd_kafka_resp_err_t rd_kafka_sticky_assignor_assign_cb( - rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - rd_kafka_broker_id_rack_pair_t *broker_rack_pair, - size_t broker_rack_pair_cnt, - char *errstr, - size_t errstr_size, - void *opaque) { +rd_kafka_resp_err_t +rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { /* FIXME: Let the cgrp pass the actual eligible partition count */ size_t partition_cnt = member_cnt * 10; /* FIXME */ @@ -2211,7 +2209,7 @@ static int ut_testOneConsumerNoTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2237,7 +2235,7 @@ static int ut_testOneConsumerNonexistentTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2264,7 +2262,7 @@ static int ut_testOneConsumerOneTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2298,7 +2296,7 @@ static int ut_testOnlyAssignsPartitionsFromSubscribedTopics( ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2327,7 +2325,7 @@ static int ut_testOneConsumerMultipleTopics(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2356,7 +2354,7 @@ ut_testTwoConsumersOneTopicOnePartition(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2387,7 +2385,7 @@ ut_testTwoConsumersOneTopicTwoPartitions(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2421,7 +2419,7 @@ static int ut_testMultipleConsumersMixedTopicSubscriptions( ut_init_member(&members[2], "consumer3", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2455,7 +2453,7 @@ ut_testTwoConsumersTwoTopicsSixPartitions(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2486,7 +2484,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, - NULL, 0, errstr, sizeof(errstr)); + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[0], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2499,7 +2497,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2513,7 +2511,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, /* Remove consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 1, - NULL, 0, errstr, sizeof(errstr)); + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[1], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2572,7 +2570,7 @@ ut_testPoorRoundRobinAssignmentScenario(rd_kafka_t *rk, "topic4", "topic5", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2607,7 +2605,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2626,7 +2624,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2648,7 +2646,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic2", 3); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2707,9 +2705,8 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, members[i - 1].rkgm_subscription = subscription; } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2723,9 +2720,8 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, sizeof(*members) * (member_cnt - 10)); member_cnt--; - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2765,9 +2761,8 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, } member_cnt--; /* Skip one consumer */ - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2778,9 +2773,8 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, */ member_cnt++; - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2828,9 +2822,8 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, rd_kafka_topic_partition_list_copy(subscription); } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2842,9 +2835,8 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, memmove(&members[5], &members[6], sizeof(*members) * (member_cnt - 6)); member_cnt--; - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2902,9 +2894,8 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( members[i].rkgm_subscription = subscription; } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2919,9 +2910,8 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( member_cnt--; } - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2966,7 +2956,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, } err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2981,7 +2971,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, "topic1", RD_KAFKA_PARTITION_UA); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3015,9 +3005,8 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, ut_init_member(&members[2], "consumer3", "topic1", NULL); ut_init_member(&members[3], "consumer4", "topic1", NULL); - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -3038,8 +3027,7 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, * Remove potential group leader consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], - member_cnt - 1, NULL, 0, errstr, - sizeof(errstr)); + member_cnt - 1, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], member_cnt - 1, metadata); @@ -3122,9 +3110,8 @@ static int ut_testStickiness(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { 0); - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); @@ -3157,7 +3144,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Just consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, - NULL, 0, errstr, sizeof(errstr)); + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 1, metadata); @@ -3167,7 +3154,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* consumer1 and consumer2 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 2, - NULL, 0, errstr, sizeof(errstr)); + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 2, metadata); @@ -3180,9 +3167,8 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Run it twice, should be stable. */ for (i = 0; i < 2; i++) { /* consumer1, consumer2, and consumer3 */ - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - 3, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, + members, 3, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 3, metadata); @@ -3194,7 +3180,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Remove consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 2, - NULL, 0, errstr, sizeof(errstr)); + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], 2, metadata); @@ -3206,7 +3192,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Remove consumer2 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[2], 1, - NULL, 0, errstr, sizeof(errstr)); + errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[2], 1, metadata); @@ -3236,7 +3222,7 @@ ut_testAssignmentUpdatedForDeletedTopic(rd_kafka_t *rk, NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3268,7 +3254,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( ut_init_member(&members[0], "consumer1", "topic", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3282,7 +3268,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), NULL, 0, errstr, + RD_ARRAYSIZE(members), errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3329,9 +3315,8 @@ ut_testConflictingPreviousAssignments(rd_kafka_t *rk, 1); - err = - rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, NULL, 0, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); RD_UT_ASSERT(members[0].rkgm_assignment->cnt == 1 && diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index af7b6362d5..9330b43cce 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -50,11 +50,11 @@ const char *rd_kafka_topic_state_names[] = {"unknown", "exists", "notexists", "error"}; -static int rd_kafka_topic_metadata_update( - rd_kafka_topic_t *rkt, - const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs, - rd_ts_t ts_age); +static int +rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_ts_t ts_age); /** @@ -479,8 +479,10 @@ rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, if (existing) *existing = 1; - rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic, NULL, - rkmce->rkmce_ts_insert); + rd_kafka_topic_metadata_update( + rkt, &rkmce->rkmce_mtopic, + &rkmce->rkmce_metadata_internal_topic, + rkmce->rkmce_ts_insert); } if (do_lock) @@ -1238,9 +1240,7 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, * @brief Update a topic from metadata. * * @param mdt Topic metadata. - * @param leader_epochs Array of per-partition leader epochs, or NULL. - * The array size is identical to the partition count in - * \p mdt. + * @param mdit Topic internal metadata. * @param ts_age absolute age (timestamp) of metadata. * @returns 1 if the number of partitions changed, 0 if not, and -1 if the * topic is unknown. @@ -1248,11 +1248,11 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, * * @locks_required rd_kafka_*lock() MUST be held. */ -static int rd_kafka_topic_metadata_update( - rd_kafka_topic_t *rkt, - const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs, - rd_ts_t ts_age) { +static int +rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_metadata_topic_internal_t *mdit, + rd_ts_t ts_age) { rd_kafka_t *rk = rkt->rkt_rk; int upd = 0; int j; @@ -1323,8 +1323,7 @@ static int rd_kafka_topic_metadata_update( for (j = 0; j < mdt->partition_cnt; j++) { int r; rd_kafka_broker_t *leader; - int32_t leader_epoch = - leader_epochs ? leader_epochs[j].leader_epoch : -1; + int32_t leader_epoch = mdit->partitions[j].leader_epoch; rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", " Topic %s partition %i Leader %" PRId32 @@ -1397,7 +1396,7 @@ static int rd_kafka_topic_metadata_update( int rd_kafka_topic_metadata_update2( rd_kafka_broker_t *rkb, const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs) { + const rd_kafka_metadata_topic_internal_t *mdit) { rd_kafka_topic_t *rkt; int r; @@ -1408,7 +1407,7 @@ int rd_kafka_topic_metadata_update2( return -1; /* Ignore topics that we dont have locally. */ } - r = rd_kafka_topic_metadata_update(rkt, mdt, leader_epochs, rd_clock()); + r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock()); rd_kafka_wrunlock(rkb->rkb_rk); @@ -1886,9 +1885,12 @@ void rd_kafka_local_topics_to_list(rd_kafka_t *rk, void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, int partition_cnt, int32_t leader_id) { - struct rd_kafka_metadata_topic mdt = {.topic = + rd_kafka_metadata_partition_internal_t *partitions = + rd_calloc(partition_cnt, sizeof(*partitions)); + struct rd_kafka_metadata_topic mdt = {.topic = (char *)rkt->rkt_topic->str, .partition_cnt = partition_cnt}; + rd_kafka_metadata_topic_internal_t mdit = {.partitions = partitions}; int i; mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt); @@ -1900,7 +1902,8 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, } rd_kafka_wrlock(rkt->rkt_rk); - rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, rd_true); - rd_kafka_topic_metadata_update(rkt, &mdt, NULL, rd_clock()); + rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true); + rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock()); rd_kafka_wrunlock(rkt->rkt_rk); + rd_free(partitions); } diff --git a/src/rdkafka_topic.h b/src/rdkafka_topic.h index cbed9308a7..bacba6e79a 100644 --- a/src/rdkafka_topic.h +++ b/src/rdkafka_topic.h @@ -258,7 +258,7 @@ rd_kafka_topic_get_error(rd_kafka_topic_t *rkt) { int rd_kafka_topic_metadata_update2( rd_kafka_broker_t *rkb, const struct rd_kafka_metadata_topic *mdt, - const rd_kafka_partition_leader_epoch_t *leader_epochs); + const rd_kafka_metadata_topic_internal_t *mdit); void rd_kafka_topic_scan_all(rd_kafka_t *rk, rd_ts_t now);