Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata refactor to add internal fields #4279

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
mtx_destroy(&rk->rk_init_lock);

if (rk->rk_full_metadata)
emasab marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_metadata_destroy(rk->rk_full_metadata);
rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata);
rd_kafkap_str_destroy(rk->rk_client_id);
rd_kafkap_str_destroy(rk->rk_group_id);
rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6349,7 +6349,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
const int log_decode_errors = LOG_ERR;
int nodeid;
int32_t nodeid;
uint16_t port;
int16_t api_version;
int32_t cnt;
Expand Down
45 changes: 11 additions & 34 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk,
return 0;
}

void rd_kafka_broker_rack_pair_destroy_cnt(
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t cnt) {
size_t i;
for (i = 0; i < cnt; i++)
RD_IF_FREE(broker_rack_pair[i].rack, rd_kafkap_str_destroy);

rd_free(broker_rack_pair);
}

int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b) {
const rd_kafka_broker_id_rack_pair_t *a = _a;
const rd_kafka_broker_id_rack_pair_t *b = _b;
return RD_CMP(a->broker_id, b->broker_id);
}


rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
Expand Down Expand Up @@ -328,16 +312,13 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg,
}


rd_kafka_resp_err_t
rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
char *errstr,
size_t errstr_size) {
rd_kafka_resp_err_t err;
rd_ts_t ts_start = rd_clock();
int i;
Expand Down Expand Up @@ -391,8 +372,7 @@ rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg,
err = rkas->rkas_assign_cb(
rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members,
member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems,
eligible_topics.rl_cnt, broker_rack_pair, broker_rack_pair_cnt,
errstr, errstr_size, rkas->rkas_opaque);
eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque);

if (err) {
rd_kafka_dbg(
Expand Down Expand Up @@ -515,8 +495,6 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
size_t member_cnt,
rd_kafka_assignor_topic_t **eligible_topics,
size_t eligible_topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size,
void *opaque),
Expand Down Expand Up @@ -986,10 +964,9 @@ static int ut_assignors(void) {
}

/* Run assignor */
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas,
&metadata, members,
tests[i].member_cnt, NULL,
0, errstr, sizeof(errstr));
err = rd_kafka_assignor_run(
rk->rk_cgrp, rkas, &metadata, members,
tests[i].member_cnt, errstr, sizeof(errstr));

RD_UT_ASSERT(!err, "Assignor case %s for %s failed: %s",
tests[i].name,
Expand Down
38 changes: 7 additions & 31 deletions src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk,
const rd_kafka_group_member_t *rkgm,
const char *topic);

/**
* Struct to hold information about the racks on which brokers are.
*/
typedef struct rd_kafka_broker_id_rack_pair {
int32_t broker_id;
rd_kafkap_str_t *rack;
} rd_kafka_broker_id_rack_pair;

/**
* Destroys cnt broker_rack_pairs, includng the destruction of the rack.
*/
void rd_kafka_broker_rack_pair_destroy_cnt(
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t cnt);

int rd_kafka_broker_id_rack_pair_cmp(const void *_a, const void *_b);

/**
* Structure to hold metadata for a single topic and all its
* subscribing members.
Expand Down Expand Up @@ -130,8 +113,6 @@ typedef struct rd_kafka_assignor_s {
size_t member_cnt,
rd_kafka_assignor_topic_t **eligible_topics,
size_t eligible_topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size,
void *opaque);
Expand Down Expand Up @@ -172,8 +153,6 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
size_t member_cnt,
rd_kafka_assignor_topic_t **eligible_topics,
size_t eligible_topic_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size,
void *opaque),
Expand Down Expand Up @@ -213,16 +192,13 @@ void rd_kafka_assignor_update_subscription(
const rd_kafka_topic_partition_list_t *subscription);


rd_kafka_resp_err_t
rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt,
char *errstr,
size_t errstr_size);
rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg,
const rd_kafka_assignor_t *rkas,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
char *errstr,
size_t errstr_size);

rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk,
const char *protocol);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void rd_kafka_acl_result_free(void *ptr) {
* @return A new allocated Node object.
* Use rd_kafka_Node_destroy() to free when done.
*/
rd_kafka_Node_t *rd_kafka_Node_new(int id,
rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
const char *host,
uint16_t port,
const char *rack_id) {
Expand Down
6 changes: 4 additions & 2 deletions src/rdkafka_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ typedef struct rd_kafka_Node_s {
char *rack_id; /*< (optional) Node rack id */
} rd_kafka_Node_t;

rd_kafka_Node_t *
rd_kafka_Node_new(int id, const char *host, uint16_t port, const char *rack_id);
rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
const char *host,
uint16_t port,
const char *rack_id);

rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src);

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
size_t _slen; \
char *_dst; \
rd_kafka_buf_read_str(rkbuf, &_kstr); \
if (RD_KAFKAP_STR_IS_NULL(&_kstr)) { \
dst = NULL; \
break; \
} \
_slen = RD_KAFKAP_STR_LEN(&_kstr); \
if (!(_dst = rd_tmpabuf_write(tmpabuf, _kstr.str, _slen + 1))) \
rd_kafka_buf_parse_fail( \
Expand Down
29 changes: 12 additions & 17 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1659,15 +1659,12 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk,
/**
* @brief Run group assignment.
*/
static void
rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
rd_kafka_assignor_t *rkas,
rd_kafka_resp_err_t err,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt,
rd_kafka_broker_id_rack_pair_t *broker_rack_pair,
size_t broker_rack_pair_cnt) {
static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
rd_kafka_assignor_t *rkas,
rd_kafka_resp_err_t err,
rd_kafka_metadata_internal_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt) {
char errstr[512];

if (err) {
Expand All @@ -1680,9 +1677,8 @@ rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
*errstr = '\0';

/* Run assignor */
err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt,
broker_rack_pair, broker_rack_pair_cnt,
errstr, sizeof(errstr));
err = rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, members,
member_cnt, errstr, sizeof(errstr));

if (err) {
if (!*errstr)
Expand Down Expand Up @@ -1748,11 +1744,10 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk,
return RD_KAFKA_OP_RES_HANDLED;
}

rd_kafka_cgrp_assignor_run(
rkcg, rkcg->rkcg_assignor, rko->rko_err, rko->rko_u.metadata.md,
rkcg->rkcg_group_leader.members, rkcg->rkcg_group_leader.member_cnt,
rko->rko_u.metadata.broker_rack_pair,
rko->rko_u.metadata.broker_rack_pair_cnt);
rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err,
rko->rko_u.metadata.mdi,
rkcg->rkcg_group_leader.members,
rkcg->rkcg_group_leader.member_cnt);

return RD_KAFKA_OP_RES_HANDLED;
}
Expand Down
8 changes: 5 additions & 3 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ struct rd_kafka_topic_s;
struct rd_kafka_msg_s;
struct rd_kafka_broker_s;
struct rd_kafka_toppar_s;

typedef struct rd_kafka_metadata_internal_s rd_kafka_metadata_internal_t;
typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t;


Expand Down Expand Up @@ -350,8 +351,9 @@ struct rd_kafka_s {
rd_ts_t rk_ts_metadata; /* Timestamp of most recent
* metadata. */

struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
rd_kafka_metadata_internal_t
*rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */
struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */

char *rk_clusterid; /* ClusterId from metadata */
Expand Down
Loading