diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index dfd1c775f3..1639f0c036 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -59,6 +59,9 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) { if (rkgm->rkgm_member_metadata) rd_kafkap_bytes_destroy(rkgm->rkgm_member_metadata); + if (rkgm->rack_id) + rd_kafkap_str_destroy(rkgm->rack_id); + memset(rkgm, 0, sizeof(*rkgm)); } @@ -106,7 +109,9 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( const rd_list_t *topics, const void *userdata, size_t userdata_size, - const rd_kafka_topic_partition_list_t *owned_partitions) { + const rd_kafka_topic_partition_list_t *owned_partitions, + int generation, + const rd_kafkap_str_t *rack_id) { rd_kafka_buf_t *rkbuf; rd_kafkap_bytes_t *kbytes; @@ -124,12 +129,14 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( * OwnedPartitions => [Topic Partitions] // added in v1 * Topic => string * Partitions => [int32] + * GenerationId => int32 // added in v2 + * RackId => string // added in v3 */ rkbuf = rd_kafka_buf_new(1, 100 + (topic_cnt * 100) + userdata_size); /* Version */ - rd_kafka_buf_write_i16(rkbuf, 1); + rd_kafka_buf_write_i16(rkbuf, 3); rd_kafka_buf_write_i32(rkbuf, topic_cnt); RD_LIST_FOREACH(tinfo, topics, i) rd_kafka_buf_write_str(rkbuf, tinfo->topic, -1); @@ -152,6 +159,12 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( rd_false /*don't write epoch*/, rd_false /*don't write metadata*/); + /* Following data is ignored by consumer version < 2 */ + rd_kafka_buf_write_i32(rkbuf, generation); + + /* Following data is ignored by consumer version < 3 */ + rd_kafka_buf_write_kstr(rkbuf, rack_id); + /* Get binary buffer and allocate a new Kafka Bytes with a copy. */ rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf); len = rd_slice_remains(&rkbuf->rkbuf_reader); @@ -168,9 +181,13 @@ rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata( const rd_kafka_assignor_t *rkas, void *assignor_state, const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions) { - return rd_kafka_consumer_protocol_member_metadata_new(topics, NULL, 0, - owned_partitions); + const rd_kafka_topic_partition_list_t *owned_partitions, + const rd_kafkap_str_t *rack_id) { + /* Generation was earlier populated inside userData, and older versions + * of clients still expect that. So, in case the userData is empty, we + * set the explicit generation field to the default value, -1 */ + return rd_kafka_consumer_protocol_member_metadata_new( + topics, NULL, 0, owned_partitions, -1 /* generation */, rack_id); } @@ -483,7 +500,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const struct rd_kafka_assignor_s *rkas, void *assignor_state, const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions), + const rd_kafka_topic_partition_list_t *owned_partitions, + const rd_kafkap_str_t *rack_id), void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas, void **assignor_state, const rd_kafka_topic_partition_list_t *assignment, diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index b90e7dc980..c828a1cf6c 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -69,6 +69,8 @@ typedef struct rd_kafka_group_member_s { rd_kafkap_bytes_t *rkgm_member_metadata; /** Group generation id. */ int rkgm_generation; + /** Member rack id. */ + rd_kafkap_str_t *rack_id; } rd_kafka_group_member_t; @@ -120,7 +122,8 @@ typedef struct rd_kafka_assignor_s { const struct rd_kafka_assignor_s *rkas, void *assignor_state, const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions); + const rd_kafka_topic_partition_list_t *owned_partitions, + const rd_kafkap_str_t *rack_id); void (*rkas_on_assignment_cb)( const struct rd_kafka_assignor_s *rkas, @@ -158,7 +161,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const struct rd_kafka_assignor_s *rkas, void *assignor_state, const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions), + const rd_kafka_topic_partition_list_t *owned_partitions, + const rd_kafkap_str_t *rack_id), void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas, void **assignor_state, const rd_kafka_topic_partition_list_t *assignment, @@ -172,13 +176,16 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( const rd_list_t *topics, const void *userdata, size_t userdata_size, - const rd_kafka_topic_partition_list_t *owned_partitions); + const rd_kafka_topic_partition_list_t *owned_partitions, + int generation, + const rd_kafkap_str_t *rack_id); rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata( const rd_kafka_assignor_t *rkas, void *assignor_state, const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions); + const rd_kafka_topic_partition_list_t *owned_partitions, + const rd_kafkap_str_t *rack_id); void rd_kafka_assignor_update_subscription( diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index dc7ed6c0e9..ce4818cb63 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1774,9 +1774,12 @@ static int rd_kafka_group_MemberMetadata_consumer_read( rkbuf = rd_kafka_buf_new_shadow( MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL); - /* Protocol parser needs a broker handle to log errors on. */ - rkbuf->rkbuf_rkb = rkb; - rd_kafka_broker_keep(rkb); + /* Protocol parser needs a broker handle to log errors on. + * If none is provided, don't log errors (mainly for unit tests). */ + if (rkb) { + rkbuf->rkbuf_rkb = rkb; + rd_kafka_broker_keep(rkb); + } rd_kafka_buf_read_i16(rkbuf, &Version); rd_kafka_buf_read_i32(rkbuf, &subscription_cnt); @@ -1804,6 +1807,16 @@ static int rd_kafka_group_MemberMetadata_consumer_read( rkbuf, 0, rd_false, rd_false))) goto err; + if (Version >= 2) { + rd_kafka_buf_read_i32(rkbuf, &rkgm->rkgm_generation); + } + + if (Version >= 3) { + rd_kafkap_str_t RackId = RD_KAFKAP_STR_INITIALIZER; + rd_kafka_buf_read_str(rkbuf, &RackId); + rkgm->rack_id = rd_kafkap_str_copy(&RackId); + } + rd_kafka_buf_destroy(rkbuf); return 0; @@ -1812,10 +1825,11 @@ static int rd_kafka_group_MemberMetadata_consumer_read( err = rkbuf->rkbuf_err; err: - rd_rkb_dbg(rkb, CGRP, "MEMBERMETA", - "Failed to parse MemberMetadata for \"%.*s\": %s", - RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), - rd_kafka_err2str(err)); + if (rkb) + rd_rkb_dbg(rkb, CGRP, "MEMBERMETA", + "Failed to parse MemberMetadata for \"%.*s\": %s", + RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), + rd_kafka_err2str(err)); if (rkgm->rkgm_subscription) { rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription); rkgm->rkgm_subscription = NULL; @@ -5892,6 +5906,75 @@ static int unittest_list_to_map(void) { RD_UT_PASS(); } +int unittest_member_metadata_serdes(void) { + const rd_list_t *topics = + rd_list_new(0, (void *)rd_kafka_topic_info_destroy); + const rd_kafka_topic_partition_list_t *owned_partitions = + rd_list_new(0, rd_kafka_topic_partition_destroy); + const rd_kafkap_str_t *rack_id = rd_kafkap_str_new("myrack", -1); + const void *userdata = NULL; + const size_t userdata_size = 0; + const int generation = 3; + const char topic_name[] = "mytopic"; + rd_kafka_group_member_t *rkgm; + int version; + + rd_list_add(topics, rd_kafka_topic_info_new(topic_name, 3)); + rd_kafka_topic_partition_list_add(owned_partitions, topic_name, 0); + rkgm = rd_calloc(1, sizeof(*rkgm)); + + /* Note that the version variable doesn't actually change the Version + * field in the serialized message. It only runs the tests with/without + * additional fields added in that particular version. */ + for (version = 0; version <= 3; version++) { + rd_kafkap_bytes_t *member_metadata; + + /* Serialize. */ + member_metadata = + rd_kafka_consumer_protocol_member_metadata_new( + topics, userdata, userdata_size, + version >= 1 ? owned_partitions : NULL, + version >= 2 ? generation : -1, + version >= 3 ? rack_id : NULL); + + /* Deserialize. */ + rd_kafka_group_MemberMetadata_consumer_read(NULL, rkgm, + member_metadata); + + /* Compare results. */ + RD_UT_ASSERT(rkgm->rkgm_subscription->cnt == + rd_list_cnt(topics), + "subscription size should be correct"); + RD_UT_ASSERT(!strcmp(topic_name, + rkgm->rkgm_subscription->elems[0].topic), + "subscriptions should be correct"); + RD_UT_ASSERT(rkgm->rkgm_userdata->len == userdata_size, + "userdata should have the size 0"); + if (version >= 1) + RD_UT_ASSERT(!rd_kafka_topic_partition_list_cmp( + rkgm->rkgm_owned, owned_partitions, + rd_kafka_topic_partition_cmp), + "owned partitions should be same"); + if (version >= 2) + RD_UT_ASSERT(generation == rkgm->rkgm_generation, + "generation should be same"); + if (version >= 3) + RD_UT_ASSERT(!rd_kafkap_str_cmp(rack_id, rkgm->rack_id), + "rack id should be same"); + + rd_kafka_group_member_clear(rkgm); + rd_kafkap_bytes_destroy(member_metadata); + } + + /* Clean up. */ + rd_list_destroy(topics); + rd_kafka_topic_partition_list_destroy(owned_partitions); + rd_kafkap_str_destroy(rack_id); + rd_free(rkgm); + + RD_UT_PASS(); +} + /** * @brief Consumer group unit tests @@ -5904,6 +5987,7 @@ int unittest_cgrp(void) { fails += unittest_set_subtract(); fails += unittest_map_to_list(); fails += unittest_list_to_map(); + fails += unittest_member_metadata_serdes(); return fails; } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index c86a5e27cc..b632770482 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1582,7 +1582,8 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name); member_metadata = rkas->rkas_get_metadata_cb( rkas, rk->rk_cgrp->rkcg_assignor_state, topics, - rk->rk_cgrp->rkcg_group_assignment); + rk->rk_cgrp->rkcg_group_assignment, + rk->rk_conf.client_rack); rd_kafka_buf_write_kbytes(rkbuf, member_metadata); rd_kafkap_bytes_destroy(member_metadata); } diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index 3f5d91cf00..405d1363fa 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -1837,7 +1837,8 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata( const rd_kafka_assignor_t *rkas, void *assignor_state, const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions) { + const rd_kafka_topic_partition_list_t *owned_partitions, + const rd_kafkap_str_t *rack_id) { rd_kafka_sticky_assignor_state_t *state; rd_kafka_buf_t *rkbuf; rd_kafkap_bytes_t *metadata; @@ -1855,9 +1856,11 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata( * If there is no previous assignment, UserData is NULL. */ + if (!assignor_state) { return rd_kafka_consumer_protocol_member_metadata_new( - topics, NULL, 0, owned_partitions); + topics, NULL, 0, owned_partitions, -1 /* generation */, + rack_id); } state = (rd_kafka_sticky_assignor_state_t *)assignor_state; @@ -1878,7 +1881,8 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata( rd_kafka_buf_destroy(rkbuf); metadata = rd_kafka_consumer_protocol_member_metadata_new( - topics, kbytes->data, kbytes->len, owned_partitions); + topics, kbytes->data, kbytes->len, owned_partitions, + state->generation_id, rack_id); rd_kafkap_bytes_destroy(kbytes);