Skip to content

Commit

Permalink
Add protocol/parsing changes [KIP-881] (#4189)
Browse files Browse the repository at this point in the history
* Change embedded MemberMetadata protocol [KIP-881]

* Change parsing of Metadata to extract broker racks [KIP-881]

* Metadata refactor to add internal fields (#4279)

* Metadata and leader epoch refactor.
store private metadata into a struct that contains
the public one.


Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
  • Loading branch information
milindl and emasab authored May 16, 2023
1 parent 8c8f8b9 commit bc933a0
Show file tree
Hide file tree
Showing 18 changed files with 444 additions and 198 deletions.
2 changes: 1 addition & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
mtx_destroy(&rk->rk_init_lock);

if (rk->rk_full_metadata)
rd_kafka_metadata_destroy(rk->rk_full_metadata);
rd_kafka_metadata_destroy(&rk->rk_full_metadata->metadata);
rd_kafkap_str_destroy(rk->rk_client_id);
rd_kafkap_str_destroy(rk->rk_group_id);
rd_kafkap_str_destroy(rk->rk_eos.transactional_id);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6349,7 +6349,7 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
const int log_decode_errors = LOG_ERR;
int nodeid;
int32_t nodeid;
uint16_t port;
int16_t api_version;
int32_t cnt;
Expand Down
30 changes: 24 additions & 6 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) {
if (rkgm->rkgm_member_metadata)
rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata);

if (rkgm->rkgm_rack_id)
rd_kafkap_str_destroy(rkgm->rkgm_rack_id);

memset(rkgm, 0, sizeof(*rkgm));
}

Expand Down Expand Up @@ -106,7 +109,9 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
const void *userdata,
size_t userdata_size,
const rd_kafka_topic_partition_list_t *owned_partitions) {
const rd_kafka_topic_partition_list_t *owned_partitions,
int generation,
const rd_kafkap_str_t *rack_id) {

rd_kafka_buf_t *rkbuf;
rd_kafkap_bytes_t *kbytes;
Expand All @@ -124,12 +129,14 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
* OwnedPartitions => [Topic Partitions] // added in v1
* Topic => string
* Partitions => [int32]
* GenerationId => int32 // added in v2
* RackId => string // added in v3
*/

rkbuf = rd_kafka_buf_new(1, 100 + (topic_cnt * 100) + userdata_size);

/* Version */
rd_kafka_buf_write_i16(rkbuf, 1);
rd_kafka_buf_write_i16(rkbuf, 3);
rd_kafka_buf_write_i32(rkbuf, topic_cnt);
RD_LIST_FOREACH(tinfo, topics, i)
rd_kafka_buf_write_str(rkbuf, tinfo->topic, -1);
Expand All @@ -154,6 +161,12 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
rd_false /*any offset*/, fields);
}

/* Following data is ignored by consumer version < 2 */
rd_kafka_buf_write_i32(rkbuf, generation);

/* Following data is ignored by consumer version < 3 */
rd_kafka_buf_write_kstr(rkbuf, rack_id);

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
len = rd_slice_remains(&rkbuf->rkbuf_reader);
Expand All @@ -170,9 +183,13 @@ rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions) {
return rd_kafka_consumer_protocol_member_metadata_new(topics, NULL, 0,
owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id) {
/* Generation was earlier populated inside userData, and older versions
* of clients still expect that. So, in case the userData is empty, we
* set the explicit generation field to the default value, -1 */
return rd_kafka_consumer_protocol_member_metadata_new(
topics, NULL, 0, owned_partitions, -1 /* generation */, rack_id);
}


Expand Down Expand Up @@ -485,7 +502,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions),
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id),
void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas,
void **assignor_state,
const rd_kafka_topic_partition_list_t *assignment,
Expand Down
16 changes: 11 additions & 5 deletions src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ typedef struct rd_kafka_group_member_s {
rd_kafkap_bytes_t *rkgm_member_metadata;
/** Group generation id. */
int rkgm_generation;
/** Member rack id. */
rd_kafkap_str_t *rkgm_rack_id;
} rd_kafka_group_member_t;


Expand All @@ -78,7 +80,6 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk,
const rd_kafka_group_member_t *rkgm,
const char *topic);


/**
* Structure to hold metadata for a single topic and all its
* subscribing members.
Expand Down Expand Up @@ -120,7 +121,8 @@ typedef struct rd_kafka_assignor_s {
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id);

void (*rkas_on_assignment_cb)(
const struct rd_kafka_assignor_s *rkas,
Expand Down Expand Up @@ -158,7 +160,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add(
const struct rd_kafka_assignor_s *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions),
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id),
void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas,
void **assignor_state,
const rd_kafka_topic_partition_list_t *assignment,
Expand All @@ -172,13 +175,16 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
const rd_list_t *topics,
const void *userdata,
size_t userdata_size,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
int generation,
const rd_kafkap_str_t *rack_id);

rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata(
const rd_kafka_assignor_t *rkas,
void *assignor_state,
const rd_list_t *topics,
const rd_kafka_topic_partition_list_t *owned_partitions);
const rd_kafka_topic_partition_list_t *owned_partitions,
const rd_kafkap_str_t *rack_id);


void rd_kafka_assignor_update_subscription(
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void rd_kafka_acl_result_free(void *ptr) {
* @return A new allocated Node object.
* Use rd_kafka_Node_destroy() to free when done.
*/
rd_kafka_Node_t *rd_kafka_Node_new(int id,
rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
const char *host,
uint16_t port,
const char *rack_id) {
Expand Down
6 changes: 4 additions & 2 deletions src/rdkafka_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ typedef struct rd_kafka_Node_s {
char *rack_id; /*< (optional) Node rack id */
} rd_kafka_Node_t;

rd_kafka_Node_t *
rd_kafka_Node_new(int id, const char *host, uint16_t port, const char *rack_id);
rd_kafka_Node_t *rd_kafka_Node_new(int32_t id,
const char *host,
uint16_t port,
const char *rack_id);

rd_kafka_Node_t *rd_kafka_Node_copy(const rd_kafka_Node_t *src);

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
size_t _slen; \
char *_dst; \
rd_kafka_buf_read_str(rkbuf, &_kstr); \
if (RD_KAFKAP_STR_IS_NULL(&_kstr)) { \
dst = NULL; \
break; \
} \
_slen = RD_KAFKAP_STR_LEN(&_kstr); \
if (!(_dst = rd_tmpabuf_write(tmpabuf, _kstr.str, _slen + 1))) \
rd_kafka_buf_parse_fail( \
Expand Down
106 changes: 95 additions & 11 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,7 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk,
static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
rd_kafka_assignor_t *rkas,
rd_kafka_resp_err_t err,
rd_kafka_metadata_t *metadata,
rd_kafka_metadata_internal_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt) {
char errstr[512];
Expand All @@ -1677,8 +1677,8 @@ static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
*errstr = '\0';

/* Run assignor */
err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt,
errstr, sizeof(errstr));
err = rd_kafka_assignor_run(rkcg, rkas, &metadata->metadata, members,
member_cnt, errstr, sizeof(errstr));

if (err) {
if (!*errstr)
Expand Down Expand Up @@ -1745,7 +1745,7 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk,
}

rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err,
rko->rko_u.metadata.md,
rko->rko_u.metadata.mdi,
rkcg->rkcg_group_leader.members,
rkcg->rkcg_group_leader.member_cnt);

Expand Down Expand Up @@ -1777,9 +1777,12 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rkbuf = rd_kafka_buf_new_shadow(
MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL);

/* Protocol parser needs a broker handle to log errors on. */
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
/* Protocol parser needs a broker handle to log errors on.
* If none is provided, don't log errors (mainly for unit tests). */
if (rkb) {
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
}

rd_kafka_buf_read_i16(rkbuf, &Version);
rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
Expand Down Expand Up @@ -1810,6 +1813,16 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
goto err;

if (Version >= 2) {
rd_kafka_buf_read_i32(rkbuf, &rkgm->rkgm_generation);
}

if (Version >= 3) {
rd_kafkap_str_t RackId = RD_KAFKAP_STR_INITIALIZER;
rd_kafka_buf_read_str(rkbuf, &RackId);
rkgm->rkgm_rack_id = rd_kafkap_str_copy(&RackId);
}

rd_kafka_buf_destroy(rkbuf);

return 0;
Expand All @@ -1818,10 +1831,11 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
err = rkbuf->rkbuf_err;

err:
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkb)
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkgm->rkgm_subscription) {
rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription);
rkgm->rkgm_subscription = NULL;
Expand Down Expand Up @@ -5952,6 +5966,75 @@ static int unittest_list_to_map(void) {
RD_UT_PASS();
}

int unittest_member_metadata_serdes(void) {
rd_list_t *topics = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
rd_kafka_topic_partition_list_t *owned_partitions =
rd_kafka_topic_partition_list_new(0);
rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1);
const void *userdata = NULL;
const int32_t userdata_size = 0;
const int generation = 3;
const char topic_name[] = "mytopic";
rd_kafka_group_member_t *rkgm;
int version;

rd_list_add(topics, rd_kafka_topic_info_new(topic_name, 3));
rd_kafka_topic_partition_list_add(owned_partitions, topic_name, 0);
rkgm = rd_calloc(1, sizeof(*rkgm));

/* Note that the version variable doesn't actually change the Version
* field in the serialized message. It only runs the tests with/without
* additional fields added in that particular version. */
for (version = 0; version <= 3; version++) {
rd_kafkap_bytes_t *member_metadata;

/* Serialize. */
member_metadata =
rd_kafka_consumer_protocol_member_metadata_new(
topics, userdata, userdata_size,
version >= 1 ? owned_partitions : NULL,
version >= 2 ? generation : -1,
version >= 3 ? rack_id : NULL);

/* Deserialize. */
rd_kafka_group_MemberMetadata_consumer_read(NULL, rkgm,
member_metadata);

/* Compare results. */
RD_UT_ASSERT(rkgm->rkgm_subscription->cnt ==
rd_list_cnt(topics),
"subscription size should be correct");
RD_UT_ASSERT(!strcmp(topic_name,
rkgm->rkgm_subscription->elems[0].topic),
"subscriptions should be correct");
RD_UT_ASSERT(rkgm->rkgm_userdata->len == userdata_size,
"userdata should have the size 0");
if (version >= 1)
RD_UT_ASSERT(!rd_kafka_topic_partition_list_cmp(
rkgm->rkgm_owned, owned_partitions,
rd_kafka_topic_partition_cmp),
"owned partitions should be same");
if (version >= 2)
RD_UT_ASSERT(generation == rkgm->rkgm_generation,
"generation should be same");
if (version >= 3)
RD_UT_ASSERT(
!rd_kafkap_str_cmp(rack_id, rkgm->rkgm_rack_id),
"rack id should be same");

rd_kafka_group_member_clear(rkgm);
rd_kafkap_bytes_destroy(member_metadata);
}

/* Clean up. */
rd_list_destroy(topics);
rd_kafka_topic_partition_list_destroy(owned_partitions);
rd_kafkap_str_destroy(rack_id);
rd_free(rkgm);

RD_UT_PASS();
}


/**
* @brief Consumer group unit tests
Expand All @@ -5964,6 +6047,7 @@ int unittest_cgrp(void) {
fails += unittest_set_subtract();
fails += unittest_map_to_list();
fails += unittest_list_to_map();
fails += unittest_member_metadata_serdes();

return fails;
}
8 changes: 5 additions & 3 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ struct rd_kafka_topic_s;
struct rd_kafka_msg_s;
struct rd_kafka_broker_s;
struct rd_kafka_toppar_s;

typedef struct rd_kafka_metadata_internal_s rd_kafka_metadata_internal_t;
typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t;


Expand Down Expand Up @@ -350,8 +351,9 @@ struct rd_kafka_s {
rd_ts_t rk_ts_metadata; /* Timestamp of most recent
* metadata. */

struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
rd_kafka_metadata_internal_t
*rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timestamp of .. */
struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */

char *rk_clusterid; /* ClusterId from metadata */
Expand Down
Loading

0 comments on commit bc933a0

Please sign in to comment.