Skip to content

Commit

Permalink
Change parsing of Metadata to extract broker racks [KIP-881]
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Apr 13, 2023
1 parent d5fda99 commit c4cb7c9
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 130 deletions.
49 changes: 36 additions & 13 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) {
if (rkgm->rkgm_member_metadata)
rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata);

if (rkgm->rack_id)
rd_kafkap_str_destroy(rkgm->rack_id);
if (rkgm->rkgm_rack_id)
rd_kafkap_str_destroy(rkgm->rkgm_rack_id);

memset(rkgm, 0, sizeof(*rkgm));
}
Expand Down Expand Up @@ -104,6 +104,22 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk,
return 0;
}

void rd_kafka_broker_rack_pair_destroy_cnt(
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t cnt) {
size_t i;
for (i = 0; i < cnt; i++)
RD_IF_FREE(broker_rack_pair[i].rack, rd_kafkap_str_destroy);

rd_free(broker_rack_pair);
}

int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b) {
const rd_kafka_broker_id_rack_pair_t *a = _a;
const rd_kafka_broker_id_rack_pair_t *b = _b;
return RD_CMP(a->broker_id, b->broker_id);
}


rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
Expand Down Expand Up @@ -312,13 +328,16 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg,
}


rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t
rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t err;
rd_ts_t ts_start = rd_clock();
int i;
Expand Down Expand Up @@ -372,7 +391,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg,
err = rkas->rkas_assign_cb(
rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members,
member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems,
eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque);
eligible_topics.rl_cnt, broker_rack_pair, broker_rack_pair_cnt,
errstr, errstr_size, rkas->rkas_opaque);

if (err) {
rd_kafka_dbg(
Expand Down Expand Up @@ -495,6 +515,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
size_t member_cnt,
rd_kafka_assignor_topic_t **eligible_topics,
size_t eligible_topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size,
void *opaque),
Expand Down Expand Up @@ -964,9 +986,10 @@ static int ut_assignors(void) {
}

/* Run assignor */
err = rd_kafka_assignor_run(
rk->rk_cgrp, rkas, &metadata, members,
tests[i].member_cnt, errstr, sizeof(errstr));
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas,
&metadata, members,
tests[i].member_cnt, NULL,
0, errstr, sizeof(errstr));

RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s",
tests[i].name,
Expand Down
39 changes: 31 additions & 8 deletions src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ typedef struct rd_kafka_group_member_s {
/** Group generation id. */
int rkgm_generation;
/** Member rack id. */
rd_kafkap_str_t *rack_id;
rd_kafkap_str_t *rkgm_rack_id;
} rd_kafka_group_member_t;


Expand All @@ -80,6 +80,22 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk,
const rd_kafka_group_member_t *rkgm,
const char *topic);

/**
* Struct to hold information about the racks on which brokers are.
*/
typedef struct rd_kafka_broker_id_rack_pair {
int32_t broker_id;
rd_kafkap_str_t *rack;
} rd_kafka_broker_id_rack_pair;

/**
* Destroys cnt broker_rack_pairs, includng the destruction of the rack.
*/
void rd_kafka_broker_rack_pair_destroy_cnt(
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t cnt);

int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b);

/**
* Structure to hold metadata for a single topic and all its
Expand Down Expand Up @@ -114,6 +130,8 @@ typedef struct rd_kafka_assignor_s {
size_t member_cnt,
rd_kafka_assignor_topic_t **eligible_topics,
size_t eligible_topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size,
void *opaque);
Expand Down Expand Up @@ -154,6 +172,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
size_t member_cnt,
rd_kafka_assignor_topic_t **eligible_topics,
size_t eligible_topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size,
void *opaque),
Expand Down Expand Up @@ -193,13 +213,16 @@ void rd_kafka_assignor_update_subscription(
const rd_kafka_topic_partition_list_t *subscription);


rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
char *errstr,
size_t errstr_size);
rd_kafka_resp_err_t
rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size);

rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk,
const char *protocol);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -5282,6 +5282,7 @@ int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) {
*
* @param rkbp if non-NULL, will be set to the broker object with
* refcount increased, or NULL on error.
* @param rack if non-NULL, it will set the rack of the broker object.
*
* @locks none
* @locality any
Expand Down
49 changes: 27 additions & 22 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1659,12 +1659,15 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk,
/**
* @brief Run group assignment.
*/
static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
rd_kafka_assignor_t *rkas,
rd_kafka_resp_err_t err,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt) {
static void
rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
rd_kafka_assignor_t *rkas,
rd_kafka_resp_err_t err,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt) {
char errstr[512];

if (err) {
Expand All @@ -1678,6 +1681,7 @@ static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,

/* Run assignor */
err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt,
broker_rack_pair, broker_rack_pair_cnt,
errstr, sizeof(errstr));

if (err) {
Expand Down Expand Up @@ -1744,10 +1748,11 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk,
return RD_KAFKA_OP_RES_HANDLED;
}

rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err,
rko->rko_u.metadata.md,
rkcg->rkcg_group_leader.members,
rkcg->rkcg_group_leader.member_cnt);
rd_kafka_cgrp_assignor_run(
rkcg, rkcg->rkcg_assignor, rko->rko_err, rko->rko_u.metadata.md,
rkcg->rkcg_group_leader.members, rkcg->rkcg_group_leader.member_cnt,
rko->rko_u.metadata.broker_rack_pair,
rko->rko_u.metadata.broker_rack_pair_cnt);

return RD_KAFKA_OP_RES_HANDLED;
}
Expand Down Expand Up @@ -1820,7 +1825,7 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
if (Version >= 3) {
rd_kafkap_str_t RackId = RD_KAFKAP_STR_INITIALIZER;
rd_kafka_buf_read_str(rkbuf, &RackId);
rkgm->rack_id = rd_kafkap_str_copy(&RackId);
rkgm->rkgm_rack_id = rd_kafkap_str_copy(&RackId);
}

rd_kafka_buf_destroy(rkbuf);
Expand Down Expand Up @@ -5967,15 +5972,14 @@ static int unittest_list_to_map(void) {
}

int unittest_member_metadata_serdes(void) {
const rd_list_t *topics =
rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
const rd_kafka_topic_partition_list_t *owned_partitions =
rd_list_new(0, rd_kafka_topic_partition_destroy);
const rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1);
const void *userdata = NULL;
const size_t userdata_size = 0;
const int generation = 3;
const char topic_name[] = "mytopic";
rd_list_t *topics = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
rd_kafka_topic_partition_list_t *owned_partitions =
rd_kafka_topic_partition_list_new(0);
rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1);
const void *userdata = NULL;
const int32_t userdata_size = 0;
const int generation = 3;
const char topic_name[] = "mytopic";
rd_kafka_group_member_t *rkgm;
int version;

Expand Down Expand Up @@ -6019,8 +6023,9 @@ int unittest_member_metadata_serdes(void) {
RD_UT_ASSERT(generation == rkgm->rkgm_generation,
"generation should be same");
if (version >= 3)
RD_UT_ASSERT(!rd_kafkap_str_cmp(rack_id, rkgm->rack_id),
"rack id should be same");
RD_UT_ASSERT(
!rd_kafkap_str_cmp(rack_id, rkgm->rkgm_rack_id),
"rack id should be same");

rd_kafka_group_member_clear(rkgm);
rd_kafkap_bytes_destroy(member_metadata);
Expand Down
41 changes: 35 additions & 6 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,21 @@ rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb) {
* The metadata will be marshalled into 'struct rd_kafka_metadata*' structs.
*
* The marshalled metadata is returned in \p *mdp, (NULL on error).
*
* Information about the racks-per-broker is returned in \p *broker_rack_pair_p
* if it's not NULL. The count of racks-per-broker is equal to mdp->broker_cnt,
* and the pairs are sorted by broker id.
*
* @returns an error code on parse failure, else NO_ERRRO.
*
* @locality rdkafka main thread
*/
rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
struct rd_kafka_metadata **mdp) {
rd_kafka_resp_err_t
rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
struct rd_kafka_metadata **mdp,
rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p) {
rd_kafka_t *rk = rkb->rkb_rk;
int i, j, k;
rd_tmpabuf_t tbuf;
Expand Down Expand Up @@ -358,6 +364,11 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
"%d brokers: tmpabuf memory shortage",
md->broker_cnt);

if (ApiVersion >= 1 && broker_rack_pair_p) {
*broker_rack_pair_p = rd_malloc(
sizeof(rd_kafka_broker_id_rack_pair_t) * md->broker_cnt);
}

for (i = 0; i < md->broker_cnt; i++) {
rd_kafka_buf_read_i32a(rkbuf, md->brokers[i].id);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
Expand All @@ -367,6 +378,12 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
if (ApiVersion >= 1) {
rd_kafkap_str_t rack;
rd_kafka_buf_read_str(rkbuf, &rack);
if (broker_rack_pair_p) {
(*broker_rack_pair_p)[i].broker_id =
md->brokers[i].id;
(*broker_rack_pair_p)[i].rack =
rd_kafkap_str_copy(&rack);
}
}

rd_kafka_buf_skip_tags(rkbuf);
Expand All @@ -382,7 +399,10 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
RD_KAFKAP_STR_PR(&cluster_id), controller_id);
}


if (broker_rack_pair_p)
qsort(*broker_rack_pair_p, md->broker_cnt,
sizeof(rd_kafka_broker_id_rack_pair_t),
rd_kafka_broker_id_rack_pair_cmp);

/* Read TopicMetadata */
rd_kafka_buf_read_arraycnt(rkbuf, &md->topic_cnt, RD_KAFKAP_TOPICS_MAX);
Expand Down Expand Up @@ -768,6 +788,15 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
if (leader_epochs)
rd_free(leader_epochs);

if (broker_rack_pair_p && *broker_rack_pair_p) {
/* md must be allocated if *broker_rack_pair_p != NULL
since md->brokers_cnt is used to allocate it */
rd_assert(md);
rd_kafka_broker_rack_pair_destroy_cnt(*broker_rack_pair_p,
md->broker_cnt);
*broker_rack_pair_p = NULL;
}

rd_tmpabuf_destroy(&tbuf);

return err;
Expand Down
10 changes: 6 additions & 4 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@

rd_bool_t rd_kafka_has_reliable_leader_epochs(rd_kafka_broker_t *rkb);

rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
struct rd_kafka_metadata **mdp);
rd_kafka_resp_err_t
rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
struct rd_kafka_metadata **mdp,
rd_kafka_broker_id_rack_pair_t **broker_rack_pair_p);

struct rd_kafka_metadata *
rd_kafka_metadata_copy(const struct rd_kafka_metadata *md, size_t size);
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
break;

case RD_KAFKA_OP_METADATA:
if (rko->rko_u.metadata.broker_rack_pair) {
rd_kafka_broker_rack_pair_destroy_cnt(
rko->rko_u.metadata.broker_rack_pair,
rko->rko_u.metadata.broker_rack_pair_cnt);
}

RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
break;

Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
typedef struct rd_kafka_q_s rd_kafka_q_t;
typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
typedef struct rd_kafka_op_s rd_kafka_op_t;
typedef struct rd_kafka_broker_id_rack_pair rd_kafka_broker_id_rack_pair_t;

/* One-off reply queue + reply version.
* All APIs that take a rd_kafka_replyq_t makes a copy of the
Expand Down Expand Up @@ -370,6 +371,11 @@ struct rd_kafka_op_s {
/* RD_KAFKA_OP_METADATA */
struct {
rd_kafka_metadata_t *md;
size_t broker_rack_pair_cnt;
rd_kafka_broker_id_rack_pair_t
*broker_rack_pair; /* mapping of broker id -> rack
string as seen in metadata,
sorted by broker id. */
int force; /* force request regardless of outstanding
* metadata requests. */
} metadata;
Expand Down
Loading

0 comments on commit c4cb7c9

Please sign in to comment.