Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change embedded MemberMetadata protocol [KIP-881] #4184

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) {
if (rkgm->rkgm_member_metadata)
rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata);

if (rkgm->rack_id)
rd_kafkap_str_destroy(rkgm->rack_id);

memset(rkgm, 0, sizeof(*rkgm));
}

Expand Down Expand Up @@ -106,7 +109,9 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
const void *userdata,
size_t userdata_size,
const rd_kafka_topic_partition_list_t *owned_partitions) {
const rd_kafka_topic_partition_list_t *owned_partitions,
int generation,
const rd_kafkap_str_t *rack_id) {

rd_kafka_buf_t *rkbuf;
rd_kafkap_bytes_t *kbytes;
Expand All @@ -124,12 +129,14 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
* OwnedPartitions => [Topic Partitions] // added in v1
* Topic => string
* Partitions => [int32]
* GenerationId => int32 // added in v2
* RackId => string // added in v3
*/

rkbuf = rd_kafka_buf_new(1, 100 + (topic_cnt * 100) + userdata_size);

/* Version */
rd_kafka_buf_write_i16(rkbuf, 1);
rd_kafka_buf_write_i16(rkbuf, 3);
rd_kafka_buf_write_i32(rkbuf, topic_cnt);
RD_LIST_FOREACH(tinfo, topics, i)
rd_kafka_buf_write_str(rkbuf, tinfo->topic, -1);
Expand All @@ -152,6 +159,12 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
rd_false /*don't write epoch*/,
rd_false /*don't write metadata*/);

/* Following data is ignored by consumer version < 2 */
rd_kafka_buf_write_i32(rkbuf, generation);

/* Following data is ignored by consumer version < 3 */
rd_kafka_buf_write_kstr(rkbuf, rack_id);

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
len = rd_slice_remains(&rkbuf->rkbuf_reader);
Expand All @@ -168,9 +181,13 @@ rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions) {
return rd_kafka_consumer_protocol_member_metadata_new(topics, NULL, 0,
owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id) {
/* Generation was earlier populated inside userData, and older versions
* of clients still expect that. So, in case the userData is empty, we
* set the explicit generation field to the default value, -1 */
return rd_kafka_consumer_protocol_member_metadata_new(
topics, NULL, 0, owned_partitions, -1 /* generation */, rack_id);
}


Expand Down Expand Up @@ -483,7 +500,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions),
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id),
void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas,
void **assignor_state,
const rd_kafka_topic_partition_list_t *assignment,
Expand Down
15 changes: 11 additions & 4 deletions src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ typedef struct rd_kafka_group_member_s {
rd_kafkap_bytes_t *rkgm_member_metadata;
/** Group generation id. */
int rkgm_generation;
/** Member rack id. */
rd_kafkap_str_t *rack_id;
} rd_kafka_group_member_t;


Expand Down Expand Up @@ -120,7 +122,8 @@ typedef struct rd_kafka_assignor_s {
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id);

void (*rkas_on_assignment_cb)(
const struct rd_kafka_assignor_s *rkas,
Expand Down Expand Up @@ -158,7 +161,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions),
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id),
void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas,
void **assignor_state,
const rd_kafka_topic_partition_list_t *assignment,
Expand All @@ -172,13 +176,16 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
const void *userdata,
size_t userdata_size,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
int generation,
const rd_kafkap_str_t *rack_id);

rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id);


void rd_kafka_assignor_update_subscription(
Expand Down
98 changes: 91 additions & 7 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1774,9 +1774,12 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rkbuf = rd_kafka_buf_new_shadow(
MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL);

/* Protocol parser needs a broker handle to log errors on. */
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
/* Protocol parser needs a broker handle to log errors on.
* If none is provided, don't log errors (mainly for unit tests). */
if (rkb) {
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
}

rd_kafka_buf_read_i16(rkbuf, &Version);
rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
Expand Down Expand Up @@ -1804,6 +1807,16 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rkbuf, 0, rd_false, rd_false)))
goto err;

if (Version >= 2) {
rd_kafka_buf_read_i32(rkbuf, &rkgm->rkgm_generation);
}

if (Version >= 3) {
rd_kafkap_str_t RackId = RD_KAFKAP_STR_INITIALIZER;
rd_kafka_buf_read_str(rkbuf, &RackId);
rkgm->rack_id = rd_kafkap_str_copy(&RackId);
}

rd_kafka_buf_destroy(rkbuf);

return 0;
Expand All @@ -1812,10 +1825,11 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
err = rkbuf->rkbuf_err;

err:
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkb)
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkgm->rkgm_subscription) {
rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription);
rkgm->rkgm_subscription = NULL;
Expand Down Expand Up @@ -5892,6 +5906,75 @@ static int unittest_list_to_map(void) {
RD_UT_PASS();
}

int unittest_member_metadata_serdes(void) {
const rd_list_t *topics =
rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
const rd_kafka_topic_partition_list_t *owned_partitions =
rd_list_new(0, rd_kafka_topic_partition_destroy);
const rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1);
const void *userdata = NULL;
const size_t userdata_size = 0;
const int generation = 3;
const char topic_name[] = "mytopic";
rd_kafka_group_member_t *rkgm;
int version;

rd_list_add(topics, rd_kafka_topic_info_new(topic_name, 3));
rd_kafka_topic_partition_list_add(owned_partitions, topic_name, 0);
rkgm = rd_calloc(1, sizeof(*rkgm));

/* Note that the version variable doesn't actually change the Version
* field in the serialized message. It only runs the tests with/without
* additional fields added in that particular version. */
for (version = 0; version <= 3; version++) {
rd_kafkap_bytes_t *member_metadata;

/* Serialize. */
member_metadata =
rd_kafka_consumer_protocol_member_metadata_new(
topics, userdata, userdata_size,
version >= 1 ? owned_partitions : NULL,
version >= 2 ? generation : -1,
version >= 3 ? rack_id : NULL);

/* Deserialize. */
rd_kafka_group_MemberMetadata_consumer_read(NULL, rkgm,
member_metadata);

/* Compare results. */
RD_UT_ASSERT(rkgm->rkgm_subscription->cnt ==
rd_list_cnt(topics),
"subscription size should be correct");
RD_UT_ASSERT(!strcmp(topic_name,
rkgm->rkgm_subscription->elems[0].topic),
"subscriptions should be correct");
RD_UT_ASSERT(rkgm->rkgm_userdata->len == userdata_size,
"userdata should have the size 0");
if (version >= 1)
RD_UT_ASSERT(!rd_kafka_topic_partition_list_cmp(
rkgm->rkgm_owned, owned_partitions,
rd_kafka_topic_partition_cmp),
"owned partitions should be same");
if (version >= 2)
RD_UT_ASSERT(generation == rkgm->rkgm_generation,
"generation should be same");
if (version >= 3)
RD_UT_ASSERT(!rd_kafkap_str_cmp(rack_id, rkgm->rack_id),
"rack id should be same");

rd_kafka_group_member_clear(rkgm);
rd_kafkap_bytes_destroy(member_metadata);
}

/* Clean up. */
rd_list_destroy(topics);
rd_kafka_topic_partition_list_destroy(owned_partitions);
rd_kafkap_str_destroy(rack_id);
rd_free(rkgm);

RD_UT_PASS();
}


/**
* @brief Consumer group unit tests
Expand All @@ -5904,6 +5987,7 @@ int unittest_cgrp(void) {
fails += unittest_set_subtract();
fails += unittest_map_to_list();
fails += unittest_list_to_map();
fails += unittest_member_metadata_serdes();

return fails;
}
3 changes: 2 additions & 1 deletion src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,8 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
member_metadata = rkas->rkas_get_metadata_cb(
rkas, rk->rk_cgrp->rkcg_assignor_state, topics,
rk->rk_cgrp->rkcg_group_assignment);
rk->rk_cgrp->rkcg_group_assignment,
rk->rk_conf.client_rack);
rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
rd_kafkap_bytes_destroy(member_metadata);
}
Expand Down
10 changes: 7 additions & 3 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1837,7 +1837,8 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions) {
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id) {
rd_kafka_sticky_assignor_state_t *state;
rd_kafka_buf_t *rkbuf;
rd_kafkap_bytes_t *metadata;
Expand All @@ -1855,9 +1856,11 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata(
* If there is no previous assignment, UserData is NULL.
*/


if (!assignor_state) {
return rd_kafka_consumer_protocol_member_metadata_new(
topics, NULL, 0, owned_partitions);
topics, NULL, 0, owned_partitions, -1 /* generation */,
rack_id);
}

state = (rd_kafka_sticky_assignor_state_t *)assignor_state;
Expand All @@ -1878,7 +1881,8 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata(
rd_kafka_buf_destroy(rkbuf);

metadata = rd_kafka_consumer_protocol_member_metadata_new(
topics, kbytes->data, kbytes->len, owned_partitions);
topics, kbytes->data, kbytes->len, owned_partitions,
state->generation_id, rack_id);

rd_kafkap_bytes_destroy(kbytes);

Expand Down