Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protocol/parsing changes [KIP-881] #4189

Merged
merged 6 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
mtx_destroy(&rk->rk_init_lock);

if (rk->rk_full_metadata)
rd_kafka_metadata_destroy(rk->rk_full_metadata);
rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata);
rd_kafkap_str_destroy(rk->rk_client_id);
rd_kafkap_str_destroy(rk->rk_group_id);
rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6349,7 +6349,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
const int log_decode_errors = LOG_ERR;
int nodeid;
int32_t nodeid;
uint16_t port;
int16_t api_version;
int32_t cnt;
Expand Down
30 changes: 24 additions & 6 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) {
if (rkgm->rkgm_member_metadata)
rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata);

if (rkgm->rkgm_rack_id)
rd_kafkap_str_destroy(rkgm->rkgm_rack_id);

memset(rkgm, 0, sizeof(*rkgm));
}

Expand Down Expand Up @@ -106,7 +109,9 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
const void *userdata,
size_t userdata_size,
const rd_kafka_topic_partition_list_t *owned_partitions) {
const rd_kafka_topic_partition_list_t *owned_partitions,
int generation,
const rd_kafkap_str_t *rack_id) {

rd_kafka_buf_t *rkbuf;
rd_kafkap_bytes_t *kbytes;
Expand All @@ -124,12 +129,14 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
* OwnedPartitions => [Topic Partitions] // added in v1
* Topic => string
* Partitions => [int32]
* GenerationId => int32 // added in v2
* RackId => string // added in v3
*/

rkbuf = rd_kafka_buf_new(1, 100 + (topic_cnt * 100) + userdata_size);

/* Version */
rd_kafka_buf_write_i16(rkbuf, 1);
rd_kafka_buf_write_i16(rkbuf, 3);
rd_kafka_buf_write_i32(rkbuf, topic_cnt);
RD_LIST_FOREACH(tinfo, topics, i)
rd_kafka_buf_write_str(rkbuf, tinfo->topic, -1);
Expand All @@ -154,6 +161,12 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
rd_false /*any offset*/, fields);
}

/* Following data is ignored by consumer version < 2 */
rd_kafka_buf_write_i32(rkbuf, generation);

/* Following data is ignored by consumer version < 3 */
rd_kafka_buf_write_kstr(rkbuf, rack_id);

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
len = rd_slice_remains(&rkbuf->rkbuf_reader);
Expand All @@ -170,9 +183,13 @@ rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions) {
return rd_kafka_consumer_protocol_member_metadata_new(topics, NULL, 0,
owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id) {
/* Generation was earlier populated inside userData, and older versions
* of clients still expect that. So, in case the userData is empty, we
* set the explicit generation field to the default value, -1 */
return rd_kafka_consumer_protocol_member_metadata_new(
topics, NULL, 0, owned_partitions, -1 /* generation */, rack_id);
}


Expand Down Expand Up @@ -485,7 +502,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions),
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id),
void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas,
void **assignor_state,
const rd_kafka_topic_partition_list_t *assignment,
Expand Down
16 changes: 11 additions & 5 deletions src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ typedef struct rd_kafka_group_member_s {
rd_kafkap_bytes_t *rkgm_member_metadata;
/** Group generation id. */
int rkgm_generation;
/** Member rack id. */
rd_kafkap_str_t *rkgm_rack_id;
} rd_kafka_group_member_t;


Expand All @@ -78,7 +80,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk,
const rd_kafka_group_member_t *rkgm,
const char *topic);


/**
* Structure to hold metadata for a single topic and all its
* subscribing members.
Expand Down Expand Up @@ -120,7 +121,8 @@ typedef struct rd_kafka_assignor_s {
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id);

void (*rkas_on_assignment_cb)(
const struct rd_kafka_assignor_s *rkas,
Expand Down Expand Up @@ -158,7 +160,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions),
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id),
void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas,
void **assignor_state,
const rd_kafka_topic_partition_list_t *assignment,
Expand All @@ -172,13 +175,16 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
const void *userdata,
size_t userdata_size,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
int generation,
const rd_kafkap_str_t *rack_id);

rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id);


void rd_kafka_assignor_update_subscription(
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void rd_kafka_acl_result_free(void *ptr) {
* @return A new allocated Node object.
* Use rd_kafka_Node_destroy() to free when done.
*/
rd_kafka_Node_t *rd_kafka_Node_new(int id,
rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
const char *host,
uint16_t port,
const char *rack_id) {
Expand Down
6 changes: 4 additions & 2 deletions src/rdkafka_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ typedef struct rd_kafka_Node_s {
char *rack_id; /*< (optional) Node rack id */
} rd_kafka_Node_t;

rd_kafka_Node_t *
rd_kafka_Node_new(int id, const char *host, uint16_t port, const char *rack_id);
rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
const char *host,
uint16_t port,
const char *rack_id);

rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src);

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
size_t _slen; \
char *_dst; \
rd_kafka_buf_read_str(rkbuf, &_kstr); \
if (RD_KAFKAP_STR_IS_NULL(&_kstr)) { \
dst = NULL; \
break; \
} \
_slen = RD_KAFKAP_STR_LEN(&_kstr); \
if (!(_dst = rd_tmpabuf_write(tmpabuf, _kstr.str, _slen + 1))) \
rd_kafka_buf_parse_fail( \
Expand Down
106 changes: 95 additions & 11 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,7 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk,
static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
rd_kafka_assignor_t *rkas,
rd_kafka_resp_err_t err,
rd_kafka_metadata_t *metadata,
rd_kafka_metadata_internal_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt) {
char errstr[512];
Expand All @@ -1677,8 +1677,8 @@ static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
*errstr = '\0';

/* Run assignor */
err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt,
errstr, sizeof(errstr));
err = rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, members,
member_cnt, errstr, sizeof(errstr));

if (err) {
if (!*errstr)
Expand Down Expand Up @@ -1745,7 +1745,7 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk,
}

rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err,
rko->rko_u.metadata.md,
rko->rko_u.metadata.mdi,
rkcg->rkcg_group_leader.members,
rkcg->rkcg_group_leader.member_cnt);

Expand Down Expand Up @@ -1777,9 +1777,12 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rkbuf = rd_kafka_buf_new_shadow(
MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL);

/* Protocol parser needs a broker handle to log errors on. */
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
/* Protocol parser needs a broker handle to log errors on.
* If none is provided, don't log errors (mainly for unit tests). */
if (rkb) {
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
}

rd_kafka_buf_read_i16(rkbuf, &Version);
rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
Expand Down Expand Up @@ -1810,6 +1813,16 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
goto err;

if (Version >= 2) {
rd_kafka_buf_read_i32(rkbuf, &rkgm->rkgm_generation);
}

if (Version >= 3) {
rd_kafkap_str_t RackId = RD_KAFKAP_STR_INITIALIZER;
rd_kafka_buf_read_str(rkbuf, &RackId);
rkgm->rkgm_rack_id = rd_kafkap_str_copy(&RackId);
}

rd_kafka_buf_destroy(rkbuf);

return 0;
Expand All @@ -1818,10 +1831,11 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
err = rkbuf->rkbuf_err;

err:
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkb)
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkgm->rkgm_subscription) {
rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription);
rkgm->rkgm_subscription = NULL;
Expand Down Expand Up @@ -5952,6 +5966,75 @@ static int unittest_list_to_map(void) {
RD_UT_PASS();
}

int unittest_member_metadata_serdes(void) {
rd_list_t *topics = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
rd_kafka_topic_partition_list_t *owned_partitions =
rd_kafka_topic_partition_list_new(0);
rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1);
const void *userdata = NULL;
const int32_t userdata_size = 0;
const int generation = 3;
const char topic_name[] = "mytopic";
rd_kafka_group_member_t *rkgm;
int version;

rd_list_add(topics, rd_kafka_topic_info_new(topic_name, 3));
rd_kafka_topic_partition_list_add(owned_partitions, topic_name, 0);
rkgm = rd_calloc(1, sizeof(*rkgm));

/* Note that the version variable doesn't actually change the Version
* field in the serialized message. It only runs the tests with/without
* additional fields added in that particular version. */
for (version = 0; version <= 3; version++) {
rd_kafkap_bytes_t *member_metadata;

/* Serialize. */
member_metadata =
rd_kafka_consumer_protocol_member_metadata_new(
topics, userdata, userdata_size,
version >= 1 ? owned_partitions : NULL,
version >= 2 ? generation : -1,
version >= 3 ? rack_id : NULL);

/* Deserialize. */
rd_kafka_group_MemberMetadata_consumer_read(NULL, rkgm,
member_metadata);

/* Compare results. */
RD_UT_ASSERT(rkgm->rkgm_subscription->cnt ==
rd_list_cnt(topics),
"subscription size should be correct");
RD_UT_ASSERT(!strcmp(topic_name,
rkgm->rkgm_subscription->elems[0].topic),
"subscriptions should be correct");
RD_UT_ASSERT(rkgm->rkgm_userdata->len == userdata_size,
"userdata should have the size 0");
if (version >= 1)
RD_UT_ASSERT(!rd_kafka_topic_partition_list_cmp(
rkgm->rkgm_owned, owned_partitions,
rd_kafka_topic_partition_cmp),
"owned partitions should be same");
if (version >= 2)
RD_UT_ASSERT(generation == rkgm->rkgm_generation,
"generation should be same");
if (version >= 3)
RD_UT_ASSERT(
!rd_kafkap_str_cmp(rack_id, rkgm->rkgm_rack_id),
"rack id should be same");

rd_kafka_group_member_clear(rkgm);
rd_kafkap_bytes_destroy(member_metadata);
}

/* Clean up. */
rd_list_destroy(topics);
rd_kafka_topic_partition_list_destroy(owned_partitions);
rd_kafkap_str_destroy(rack_id);
rd_free(rkgm);

RD_UT_PASS();
}


/**
* @brief Consumer group unit tests
Expand All @@ -5964,6 +6047,7 @@ int unittest_cgrp(void) {
fails += unittest_set_subtract();
fails += unittest_map_to_list();
fails += unittest_list_to_map();
fails += unittest_member_metadata_serdes();

return fails;
}
8 changes: 5 additions & 3 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ struct rd_kafka_topic_s;
struct rd_kafka_msg_s;
struct rd_kafka_broker_s;
struct rd_kafka_toppar_s;

typedef struct rd_kafka_metadata_internal_s rd_kafka_metadata_internal_t;
typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t;


Expand Down Expand Up @@ -350,8 +351,9 @@ struct rd_kafka_s {
rd_ts_t rk_ts_metadata; /* Timestamp of most recent
* metadata. */

struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
rd_kafka_metadata_internal_t
*rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */
struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */

char *rk_clusterid; /* ClusterId from metadata */
Expand Down
Loading