From c4cb7c9c2a16d0d3df0fd394166cf355339ac007 Mon Sep 17 00:00:00 2001 From: Milind L Date: Thu, 16 Feb 2023 12:41:00 +0530 Subject: [PATCH] Change parsing of Metadata to extract broker racks [KIP-881] --- src/rdkafka_assignor.c | 49 ++++++++--- src/rdkafka_assignor.h | 39 +++++++-- src/rdkafka_broker.c | 1 + src/rdkafka_cgrp.c | 49 ++++++----- src/rdkafka_metadata.c | 41 +++++++-- src/rdkafka_metadata.h | 10 ++- src/rdkafka_op.c | 6 ++ src/rdkafka_op.h | 6 ++ src/rdkafka_range_assignor.c | 26 +++--- src/rdkafka_request.c | 19 +++- src/rdkafka_roundrobin_assignor.c | 2 + src/rdkafka_sticky_assignor.c | 139 +++++++++++++++++------------- 12 files changed, 257 insertions(+), 130 deletions(-) diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index c9bad6968d..f9cf99303f 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -59,8 +59,8 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) { if (rkgm->rkgm_member_metadata) rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata); - if (rkgm->rack_id) - rd_kafkap_str_destroy(rkgm->rack_id); + if (rkgm->rkgm_rack_id) + rd_kafkap_str_destroy(rkgm->rkgm_rack_id); memset(rkgm, 0, sizeof(*rkgm)); } @@ -104,6 +104,22 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, return 0; } +void rd_kafka_broker_rack_pair_destroy_cnt( + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t cnt) { + size_t i; + for (i = 0; i < cnt; i++) + RD_IF_FREE(broker_rack_pair[i].rack, rd_kafkap_str_destroy); + + rd_free(broker_rack_pair); +} + +int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b) { + const rd_kafka_broker_id_rack_pair_t *a = _a; + const rd_kafka_broker_id_rack_pair_t *b = _b; + return RD_CMP(a->broker_id, b->broker_id); +} + rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( const rd_list_t *topics, @@ -312,13 +328,16 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg, } -rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - char *errstr, - size_t errstr_size) { +rd_kafka_resp_err_t +rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, + char *errstr, + size_t errstr_size) { rd_kafka_resp_err_t err; rd_ts_t ts_start = rd_clock(); int i; @@ -372,7 +391,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, err = rkas->rkas_assign_cb( rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members, member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, - eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque); + eligible_topics.rl_cnt, broker_rack_pair, broker_rack_pair_cnt, + errstr, errstr_size, rkas->rkas_opaque); if (err) { rd_kafka_dbg( @@ -495,6 +515,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque), @@ -964,9 +986,10 @@ static int ut_assignors(void) { } /* Run assignor */ - err = rd_kafka_assignor_run( - rk->rk_cgrp, rkas, &metadata, members, - tests[i].member_cnt, errstr, sizeof(errstr)); + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, + &metadata, members, + tests[i].member_cnt, NULL, + 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s", tests[i].name, diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index c828a1cf6c..b07bda7878 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -70,7 +70,7 @@ typedef struct rd_kafka_group_member_s { /** Group generation id. */ int rkgm_generation; /** Member rack id. */ - rd_kafkap_str_t *rack_id; + rd_kafkap_str_t *rkgm_rack_id; } rd_kafka_group_member_t; @@ -80,6 +80,22 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, const rd_kafka_group_member_t *rkgm, const char *topic); +/** + * Struct to hold information about the racks on which brokers are. + */ +typedef struct rd_kafka_broker_id_rack_pair { + int32_t broker_id; + rd_kafkap_str_t *rack; +} rd_kafka_broker_id_rack_pair; + +/** + * Destroys cnt broker_rack_pairs, includng the destruction of the rack. + */ +void rd_kafka_broker_rack_pair_destroy_cnt( + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t cnt); + +int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b); /** * Structure to hold metadata for a single topic and all its @@ -114,6 +130,8 @@ typedef struct rd_kafka_assignor_s { size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque); @@ -154,6 +172,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque), @@ -193,13 +213,16 @@ void rd_kafka_assignor_update_subscription( const rd_kafka_topic_partition_list_t *subscription); -rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - char *errstr, - size_t errstr_size); +rd_kafka_resp_err_t +rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, + char *errstr, + size_t errstr_size); rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk, const char *protocol); diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index e8fc27b111..a89d05a69e 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -5282,6 +5282,7 @@ int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) { * * @param rkbp if non-NULL, will be set to the broker object with * refcount increased, or NULL on error. + * @param rack if non-NULL, it will set the rack of the broker object. * * @locks none * @locality any diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index c9fe025ab6..c946fff780 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1659,12 +1659,15 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, /** * @brief Run group assignment. */ -static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, - rd_kafka_assignor_t *rkas, - rd_kafka_resp_err_t err, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt) { +static void +rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, + rd_kafka_assignor_t *rkas, + rd_kafka_resp_err_t err, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt) { char errstr[512]; if (err) { @@ -1678,6 +1681,7 @@ static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, /* Run assignor */ err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt, + broker_rack_pair, broker_rack_pair_cnt, errstr, sizeof(errstr)); if (err) { @@ -1744,10 +1748,11 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; } - rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err, - rko->rko_u.metadata.md, - rkcg->rkcg_group_leader.members, - rkcg->rkcg_group_leader.member_cnt); + rd_kafka_cgrp_assignor_run( + rkcg, rkcg->rkcg_assignor, rko->rko_err, rko->rko_u.metadata.md, + rkcg->rkcg_group_leader.members, rkcg->rkcg_group_leader.member_cnt, + rko->rko_u.metadata.broker_rack_pair, + rko->rko_u.metadata.broker_rack_pair_cnt); return RD_KAFKA_OP_RES_HANDLED; } @@ -1820,7 +1825,7 @@ static int rd_kafka_group_MemberMetadata_consumer_read( if (Version >= 3) { rd_kafkap_str_t RackId = RD_KAFKAP_STR_INITIALIZER; rd_kafka_buf_read_str(rkbuf, &RackId); - rkgm->rack_id = rd_kafkap_str_copy(&RackId); + rkgm->rkgm_rack_id = rd_kafkap_str_copy(&RackId); } rd_kafka_buf_destroy(rkbuf); @@ -5967,15 +5972,14 @@ static int unittest_list_to_map(void) { } int unittest_member_metadata_serdes(void) { - const rd_list_t *topics = - rd_list_new(0, (void *)rd_kafka_topic_info_destroy); - const rd_kafka_topic_partition_list_t *owned_partitions = - rd_list_new(0, rd_kafka_topic_partition_destroy); - const rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1); - const void *userdata = NULL; - const size_t userdata_size = 0; - const int generation = 3; - const char topic_name[] = "mytopic"; + rd_list_t *topics = rd_list_new(0, (void *)rd_kafka_topic_info_destroy); + rd_kafka_topic_partition_list_t *owned_partitions = + rd_kafka_topic_partition_list_new(0); + rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1); + const void *userdata = NULL; + const int32_t userdata_size = 0; + const int generation = 3; + const char topic_name[] = "mytopic"; rd_kafka_group_member_t *rkgm; int version; @@ -6019,8 +6023,9 @@ int unittest_member_metadata_serdes(void) { RD_UT_ASSERT(generation == rkgm->rkgm_generation, "generation should be same"); if (version >= 3) - RD_UT_ASSERT(!rd_kafkap_str_cmp(rack_id, rkgm->rack_id), - "rack id should be same"); + RD_UT_ASSERT( + !rd_kafkap_str_cmp(rack_id, rkgm->rkgm_rack_id), + "rack id should be same"); rd_kafka_group_member_clear(rkgm); rd_kafkap_bytes_destroy(member_metadata); diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 4e32e5d584..b9cc333b9d 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -277,15 +277,21 @@ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb) { * The metadata will be marshalled into 'struct rd_kafka_metadata*' structs. * * The marshalled metadata is returned in \p *mdp, (NULL on error). - + * + * Information about the racks-per-broker is returned in \p *broker_rack_pair_p + * if it's not NULL. The count of racks-per-broker is equal to mdp->broker_cnt, + * and the pairs are sorted by broker id. + * * @returns an error code on parse failure, else NO_ERRRO. * * @locality rdkafka main thread */ -rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *request, - rd_kafka_buf_t *rkbuf, - struct rd_kafka_metadata **mdp) { +rd_kafka_resp_err_t +rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *request, + rd_kafka_buf_t *rkbuf, + struct rd_kafka_metadata **mdp, + rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p) { rd_kafka_t *rk = rkb->rkb_rk; int i, j, k; rd_tmpabuf_t tbuf; @@ -358,6 +364,11 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, "%d brokers: tmpabuf memory shortage", md->broker_cnt); + if (ApiVersion >= 1 && broker_rack_pair_p) { + *broker_rack_pair_p = rd_malloc( + sizeof(rd_kafka_broker_id_rack_pair_t) * md->broker_cnt); + } + for (i = 0; i < md->broker_cnt; i++) { rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id); rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, @@ -367,6 +378,12 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (ApiVersion >= 1) { rd_kafkap_str_t rack; rd_kafka_buf_read_str(rkbuf, &rack); + if (broker_rack_pair_p) { + (*broker_rack_pair_p)[i].broker_id = + md->brokers[i].id; + (*broker_rack_pair_p)[i].rack = + rd_kafkap_str_copy(&rack); + } } rd_kafka_buf_skip_tags(rkbuf); @@ -382,7 +399,10 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, RD_KAFKAP_STR_PR(&cluster_id), controller_id); } - + if (broker_rack_pair_p) + qsort(*broker_rack_pair_p, md->broker_cnt, + sizeof(rd_kafka_broker_id_rack_pair_t), + rd_kafka_broker_id_rack_pair_cmp); /* Read TopicMetadata */ rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX); @@ -768,6 +788,15 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, if (leader_epochs) rd_free(leader_epochs); + if (broker_rack_pair_p && *broker_rack_pair_p) { + /* md must be allocated if *broker_rack_pair_p != NULL + since md->brokers_cnt is used to allocate it */ + rd_assert(md); + rd_kafka_broker_rack_pair_destroy_cnt(*broker_rack_pair_p, + md->broker_cnt); + *broker_rack_pair_p = NULL; + } + rd_tmpabuf_destroy(&tbuf); return err; diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 53a959b8ec..537a852354 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -33,10 +33,12 @@ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb); -rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *request, - rd_kafka_buf_t *rkbuf, - struct rd_kafka_metadata **mdp); +rd_kafka_resp_err_t +rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *request, + rd_kafka_buf_t *rkbuf, + struct rd_kafka_metadata **mdp, + rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p); struct rd_kafka_metadata * rd_kafka_metadata_copy(const struct rd_kafka_metadata *md, size_t size); diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 128b8bb404..a7d3600464 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -373,6 +373,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { break; case RD_KAFKA_OP_METADATA: + if (rko->rko_u.metadata.broker_rack_pair) { + rd_kafka_broker_rack_pair_destroy_cnt( + rko->rko_u.metadata.broker_rack_pair, + rko->rko_u.metadata.broker_rack_pair_cnt); + } + RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy); break; diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 57c07491a2..3ff69f526f 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -38,6 +38,7 @@ typedef struct rd_kafka_q_s rd_kafka_q_t; typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; typedef struct rd_kafka_op_s rd_kafka_op_t; +typedef struct rd_kafka_broker_id_rack_pair rd_kafka_broker_id_rack_pair_t; /* One-off reply queue + reply version. * All APIs that take a rd_kafka_replyq_t makes a copy of the @@ -370,6 +371,11 @@ struct rd_kafka_op_s { /* RD_KAFKA_OP_METADATA */ struct { rd_kafka_metadata_t *md; + size_t broker_rack_pair_cnt; + rd_kafka_broker_id_rack_pair_t + *broker_rack_pair; /* mapping of broker id -> rack + string as seen in metadata, + sorted by broker id. */ int force; /* force request regardless of outstanding * metadata requests. */ } metadata; diff --git a/src/rdkafka_range_assignor.c b/src/rdkafka_range_assignor.c index c83f1f1a44..43d86eecf9 100644 --- a/src/rdkafka_range_assignor.c +++ b/src/rdkafka_range_assignor.c @@ -50,18 +50,20 @@ * C1: [t0p2, t1p2] */ -rd_kafka_resp_err_t -rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque) { +rd_kafka_resp_err_t rd_kafka_range_assignor_assign_cb( + rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { unsigned int ti; int i; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index b95bce4eb2..17cb8acce2 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2089,6 +2089,7 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, struct rd_kafka_metadata *md = NULL; const rd_list_t *topics = request->rkbuf_u.Metadata.topics; int actions; + rd_kafka_broker_id_rack_pair_t *broker_rack_pair = NULL; rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || thrd_is_current(rk->rk_thread)); @@ -2114,15 +2115,27 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk, rd_list_cnt(topics), request->rkbuf_u.Metadata.reason); - err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md); + if (rko && rko->rko_replyq.q) + err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md, + &broker_rack_pair); + else + err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md, NULL); if (err) goto err; if (rko && rko->rko_replyq.q) { /* Reply to metadata requester, passing on the metadata. * Reuse requesting rko for the reply. */ - rko->rko_err = err; - rko->rko_u.metadata.md = md; + rko->rko_err = err; + rko->rko_u.metadata.md = md; + rko->rko_u.metadata.broker_rack_pair = broker_rack_pair; + if (broker_rack_pair) { + rd_assert(md); /* rd_kafka_parse_Metadata guarantees + that md will not be NULL if + broker_rack_pair isn't. */ + rko->rko_u.metadata.broker_rack_pair_cnt = + (size_t)md->broker_cnt; + } rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); rko = NULL; diff --git a/src/rdkafka_roundrobin_assignor.c b/src/rdkafka_roundrobin_assignor.c index 6cb9193645..7ef6d9edd1 100644 --- a/src/rdkafka_roundrobin_assignor.c +++ b/src/rdkafka_roundrobin_assignor.c @@ -58,6 +58,8 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( size_t member_cnt, rd_kafka_assignor_topic_t **eligible_topics, size_t eligible_topic_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, char *errstr, size_t errstr_size, void *opaque) { diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index 922cf49711..ce37e58ce9 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -1576,18 +1576,20 @@ static void assignToMembers(map_str_toppar_list_t *currentAssignment, * * This code is closely mimicking the AK Java AbstractStickyAssignor.assign(). */ -rd_kafka_resp_err_t -rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque) { +rd_kafka_resp_err_t rd_kafka_sticky_assignor_assign_cb( + rd_kafka_t *rk, + const rd_kafka_assignor_t *rkas, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t **eligible_topics, + size_t eligible_topic_cnt, + rd_kafka_broker_id_rack_pair_t *broker_rack_pair, + size_t broker_rack_pair_cnt, + char *errstr, + size_t errstr_size, + void *opaque) { /* FIXME: Let the cgrp pass the actual eligible partition count */ size_t partition_cnt = member_cnt * 10; /* FIXME */ @@ -2209,7 +2211,7 @@ static int ut_testOneConsumerNoTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2235,7 +2237,7 @@ static int ut_testOneConsumerNonexistentTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2262,7 +2264,7 @@ static int ut_testOneConsumerOneTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2296,7 +2298,7 @@ static int ut_testOnlyAssignsPartitionsFromSubscribedTopics( ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2325,7 +2327,7 @@ static int ut_testOneConsumerMultipleTopics(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2354,7 +2356,7 @@ ut_testTwoConsumersOneTopicOnePartition(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2385,7 +2387,7 @@ ut_testTwoConsumersOneTopicTwoPartitions(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2419,7 +2421,7 @@ static int ut_testMultipleConsumersMixedTopicSubscriptions( ut_init_member(&members[2], "consumer3", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2453,7 +2455,7 @@ ut_testTwoConsumersTwoTopicsSixPartitions(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2484,7 +2486,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, ut_init_member(&members[0], "consumer1", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, - errstr, sizeof(errstr)); + NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[0], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2497,7 +2499,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2511,7 +2513,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, /* Remove consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 1, - errstr, sizeof(errstr)); + NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyAssignment(&members[1], "topic1", 0, "topic1", 1, "topic1", 2, @@ -2570,7 +2572,7 @@ ut_testPoorRoundRobinAssignmentScenario(rd_kafka_t *rk, "topic4", "topic5", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2605,7 +2607,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, ut_init_member(&members[1], "consumer2", "topic1", "topic2", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2624,7 +2626,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2646,7 +2648,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, metadata = rd_kafka_metadata_new_topic_mockv(1, "topic2", 3); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2705,8 +2707,9 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, members[i - 1].rkgm_subscription = subscription; } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2720,8 +2723,9 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, sizeof(*members) * (member_cnt - 10)); member_cnt--; - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2761,8 +2765,9 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, } member_cnt--; /* Skip one consumer */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2773,8 +2778,9 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, */ member_cnt++; - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2822,8 +2828,9 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, rd_kafka_topic_partition_list_copy(subscription); } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2835,8 +2842,9 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, memmove(&members[5], &members[6], sizeof(*members) * (member_cnt - 6)); member_cnt--; - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2894,8 +2902,9 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( members[i].rkgm_subscription = subscription; } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2910,8 +2919,9 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( member_cnt--; } - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -2956,7 +2966,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, } err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -2971,7 +2981,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, "topic1", RD_KAFKA_PARTITION_UA); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3005,8 +3015,9 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, ut_init_member(&members[2], "consumer3", "topic1", NULL); ut_init_member(&members[3], "consumer4", "topic1", NULL); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, member_cnt, metadata); @@ -3027,7 +3038,8 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, * Remove potential group leader consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], - member_cnt - 1, errstr, sizeof(errstr)); + member_cnt - 1, NULL, 0, errstr, + sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], member_cnt - 1, metadata); @@ -3110,8 +3122,9 @@ static int ut_testStickiness(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { 0); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); @@ -3144,7 +3157,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Just consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 1, - errstr, sizeof(errstr)); + NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 1, metadata); @@ -3154,7 +3167,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* consumer1 and consumer2 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, 2, - errstr, sizeof(errstr)); + NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 2, metadata); @@ -3167,8 +3180,9 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Run it twice, should be stable. */ for (i = 0; i < 2; i++) { /* consumer1, consumer2, and consumer3 */ - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, - members, 3, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + 3, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(members, 3, metadata); @@ -3180,7 +3194,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Remove consumer1 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[1], 2, - errstr, sizeof(errstr)); + NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[1], 2, metadata); @@ -3192,7 +3206,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { /* Remove consumer2 */ err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, &members[2], 1, - errstr, sizeof(errstr)); + NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); verifyValidityAndBalance(&members[2], 1, metadata); @@ -3222,7 +3236,7 @@ ut_testAssignmentUpdatedForDeletedTopic(rd_kafka_t *rk, NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3254,7 +3268,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( ut_init_member(&members[0], "consumer1", "topic", NULL); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3268,7 +3282,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - RD_ARRAYSIZE(members), errstr, + RD_ARRAYSIZE(members), NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); @@ -3315,8 +3329,9 @@ ut_testConflictingPreviousAssignments(rd_kafka_t *rk, 1); - err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, - member_cnt, errstr, sizeof(errstr)); + err = + rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, members, + member_cnt, NULL, 0, errstr, sizeof(errstr)); RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); RD_UT_ASSERT(members[0].rkgm_assignment->cnt == 1 &&