diff --git a/examples/describe_cluster.c b/examples/describe_cluster.c index b26ff0edcf..3580600576 100644 --- a/examples/describe_cluster.c +++ b/examples/describe_cluster.c @@ -141,34 +141,31 @@ int64_t parse_int(const char *what, const char *str) { */ static int print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) { - int j, acl_operation; - const rd_kafka_ClusterDescription_t *desc; - int controller_id, node_cnt, cluster_authorized_operations_cnt; - const char *cluster_id; - - desc = rd_kafka_DescribeCluster_result_description(clusterdesc); - - controller_id = rd_kafka_ClusterDescription_controller_id(desc); - node_cnt = rd_kafka_ClusterDescription_node_count(desc); - cluster_authorized_operations_cnt = - rd_kafka_ClusterDescription_cluster_authorized_operation_count( - desc); - cluster_id = rd_kafka_ClusterDescription_cluster_id(desc); + size_t j; + size_t node_cnt; + size_t authorized_operations_cnt; + const char *cluster_id = + rd_kafka_DescribeCluster_result_cluster_id(clusterdesc); + const rd_kafka_Node_t **nodes = + rd_kafka_DescribeCluster_result_nodes(clusterdesc, &node_cnt); + const rd_kafka_AclOperation_t *authorized_operations = + rd_kafka_DescribeCluster_result_authorized_operations( + clusterdesc, &authorized_operations_cnt); + const rd_kafka_Node_t *controller = + rd_kafka_DescribeCluster_result_controller(clusterdesc); printf( - "Cluster id: %s\t Controller id: %d\t ACL operations count " + "Cluster id: %s\t Controller id: %d\t authorized operations count " "allowed: %d\n", - cluster_id, controller_id, cluster_authorized_operations_cnt); - for (j = 0; j < cluster_authorized_operations_cnt; j++) { - acl_operation = - rd_kafka_ClusterDescription_authorized_operation(desc, j); + cluster_id, controller ? rd_kafka_Node_id(controller) : -1, (int)authorized_operations_cnt); + + for (j = 0; j < authorized_operations_cnt; j++) { printf("\t%s operation is allowed\n", - rd_kafka_AclOperation_name(acl_operation)); + rd_kafka_AclOperation_name(authorized_operations[j])); } for (j = 0; j < node_cnt; j++) { - const rd_kafka_Node_t *node = NULL; - node = rd_kafka_ClusterDescription_node(desc, j); + const rd_kafka_Node_t *node = nodes[j]; printf("Node [id: %" PRId32 ", host: %s" ", port: %" PRIu16 "]\n", diff --git a/examples/describe_consumer_groups.c b/examples/describe_consumer_groups.c index feb8966782..2d93f6ebd0 100644 --- a/examples/describe_consumer_groups.c +++ b/examples/describe_consumer_groups.c @@ -180,7 +180,10 @@ print_group_member_info(const rd_kafka_MemberDescription_t *member) { * @brief Print group information. */ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { - int j, member_cnt, authorized_operation_count, acl_operation; + int member_cnt; + size_t j; + size_t authorized_operations_cnt; + const rd_kafka_AclOperation_t *authorized_operations; const rd_kafka_error_t *error; char coordinator_desc[512]; const rd_kafka_Node_t *coordinator = NULL; @@ -190,8 +193,9 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { rd_kafka_ConsumerGroupDescription_partition_assignor(group); rd_kafka_consumer_group_state_t state = rd_kafka_ConsumerGroupDescription_state(group); - authorized_operation_count = - rd_kafka_ConsumerGroupDescription_authorized_operation_count(group); + authorized_operations = + rd_kafka_ConsumerGroupDescription_authorized_operations( + group, &authorized_operations_cnt); member_cnt = rd_kafka_ConsumerGroupDescription_member_count(group); error = rd_kafka_ConsumerGroupDescription_error(group); coordinator = rd_kafka_ConsumerGroupDescription_coordinator(group); @@ -212,18 +216,15 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) { group_id, partition_assignor, rd_kafka_consumer_group_state_name(state), coordinator_desc, member_cnt); - for (j = 0; j < authorized_operation_count; j++) { - acl_operation = - rd_kafka_ConsumerGroupDescription_authorized_operation( - group, j); + for (j = 0; j < authorized_operations_cnt; j++) { printf("%s operation is allowed\n", - rd_kafka_AclOperation_name(acl_operation)); + rd_kafka_AclOperation_name(authorized_operations[j])); } if (error) printf(" error[%" PRId32 "]: %s", rd_kafka_error_code(error), rd_kafka_error_string(error)); printf("\n"); - for (j = 0; j < member_cnt; j++) { + for (j = 0; j < (size_t)member_cnt; j++) { const rd_kafka_MemberDescription_t *member = rd_kafka_ConsumerGroupDescription_member(group, j); print_group_member_info(member); diff --git a/src/rdkafka.h b/src/rdkafka.h index c9b97603b9..0759201835 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -8205,12 +8205,6 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc); * @{ */ - -/** - * @brief DescribeCluster result type. - */ -typedef struct rd_kafka_ClusterDescription_s rd_kafka_ClusterDescription_t; - /** * @brief Describes the cluster. * @@ -8229,96 +8223,59 @@ void rd_kafka_DescribeCluster(rd_kafka_t *rk, rd_kafka_queue_t *rkqu); /** - * @brief Get the DescribeCluster result. + * @brief Gets the broker nodes for the \p result cluster. * - * @param result Result to get cluster result from. + * @param result The result of DescribeCluster. + * @param cntp is updated with the count of broker nodes. * + * @return An array of broker nodes. * @remark The lifetime of the returned memory is the same * as the lifetime of the \p result object. */ RD_EXPORT -const rd_kafka_ClusterDescription_t * -rd_kafka_DescribeCluster_result_description( - const rd_kafka_DescribeCluster_result_t *result); - - -/** - * @brief Gets the node count for the \p clusterdesc cluster. - * - * @param clusterdesc The cluster description. - * - * @return The node count. - */ -RD_EXPORT -const int rd_kafka_ClusterDescription_node_count( - const rd_kafka_ClusterDescription_t *clusterdesc); - -/** - * @brief Gets the node for the \p clusterdesc cluster at \p idx position. - * - * @param clusterdesc The cluster description. - * @param idx the index at which to return the node. - * - * @return The node at idx position. - * - * @remark The lifetime of the returned memory is the same - * as the lifetime of the \p clusterdesc object. - */ -RD_EXPORT -const rd_kafka_Node_t *rd_kafka_ClusterDescription_node( - const rd_kafka_ClusterDescription_t *clusterdesc, - int idx); +const rd_kafka_Node_t **rd_kafka_DescribeCluster_result_nodes( + const rd_kafka_DescribeTopics_result_t *result, + size_t *cntp); /** - * @brief Gets the cluster authorized acl operations for the \p clusterdesc - * cluster. + * @brief Gets the authorized ACL operations for the \p result cluster. * - * @param clusterdesc The cluster description. + * @param result The result of DescribeCluster. + * @param cntp is updated with authorized ACL operations count. * * @return The cluster authorized operations. + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p result object. */ RD_EXPORT -const int rd_kafka_ClusterDescription_cluster_authorized_operation_count( - const rd_kafka_ClusterDescription_t *clusterdesc); - -/** - * @brief Gets operation at index \p idx of cluster authorized operations for - * the \p clusterdesc cluster. - * - * @param clusterdesc The cluster description. - * @param idx The index for which element is needed. - * - * @return Authorized operation at given index. - */ -RD_EXPORT -const rd_kafka_AclOperation_t rd_kafka_ClusterDescription_authorized_operation( - const rd_kafka_ClusterDescription_t *clusterdesc, - size_t idx); +const rd_kafka_AclOperation_t * +rd_kafka_DescribeCluster_result_authorized_operations( + const rd_kafka_DescribeTopics_result_t *result, + size_t *cntp); /** - * @brief Gets the cluster current controller id for the \p clusterdesc cluster. + * @brief Gets the current controller for the \p result cluster. * - * @param clusterdesc The cluster description. + * @param result The result of DescribeCluster. * - * @return The cluster current controller id. + * @return The cluster current controller. */ RD_EXPORT -const int rd_kafka_ClusterDescription_controller_id( - const rd_kafka_ClusterDescription_t *clusterdesc); +const rd_kafka_Node_t* rd_kafka_DescribeCluster_result_controller( + const rd_kafka_DescribeTopics_result_t *result); /** - * @brief Gets the cluster current cluster id for the \p clusterdesc cluster. - * - * @param clusterdesc The cluster description. + * @brief Gets the cluster id for the \p result cluster. * - * @return The cluster current cluster id (char*). + * @param result The result of DescribeCluster. * + * @return The cluster id. * @remark The lifetime of the returned memory is the same - * as the lifetime of the \p clusterdesc object. + * as the lifetime of the \p result object. */ RD_EXPORT -const char *rd_kafka_ClusterDescription_cluster_id( - const rd_kafka_ClusterDescription_t *clusterdesc); +const char *rd_kafka_DescribeCluster_result_cluster_id( + const rd_kafka_DescribeTopics_result_t *result); /**@}*/ @@ -8548,30 +8505,21 @@ const char *rd_kafka_ConsumerGroupDescription_partition_assignor( const rd_kafka_ConsumerGroupDescription_t *grpdesc); /** - * @brief Gets count of authorized operations for the \p grpdesc group. + * @brief Gets the authorized ACL operations for the \p grpdesc group. * * @param grpdesc The group description. + * @param cntp is updated with authorized ACL operations count. * - * @return count of Authorized operations allowed, 0 if authorized operations - * list is NULL or empty. - */ -RD_EXPORT -size_t rd_kafka_ConsumerGroupDescription_authorized_operation_count( - const rd_kafka_ConsumerGroupDescription_t *grpdesc); - -/** - * @brief Gets operation at index \p idx of authorized operations for the - * \p grpdesc group. - * - * @param grpdesc The group description. - * @param idx The index for which element is needed. + * @return The group authorized operations. * - * @return Authorized operation at given index. + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p grpdesc object. */ RD_EXPORT -rd_kafka_AclOperation_t rd_kafka_ConsumerGroupDescription_authorized_operation( +const rd_kafka_AclOperation_t * +rd_kafka_ConsumerGroupDescription_authorized_operations( const rd_kafka_ConsumerGroupDescription_t *grpdesc, - size_t idx); + size_t *cntp); /** * @brief Gets state for the \p grpdesc group. diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 6b4775794e..d6f6f81136 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -323,21 +323,6 @@ rd_kafka_admin_fanout_worker(rd_kafka_t *rk, rd_kafka_op_t *rko_fanout); -/* Common copy function for use in rd_list_t * for (rd_kafka_AclOperation_t *) - */ -static void *rd_kafka_AclOperation_copy(const void *acl_operation, - void *opaque) { - if (!acl_operation) - return NULL; - - rd_kafka_AclOperation_t *acl_operation_copy = - rd_malloc(sizeof(rd_kafka_AclOperation_t)); - *acl_operation_copy = *((rd_kafka_AclOperation_t *)acl_operation); - - return acl_operation_copy; -} - - /** * @name Common admin request code * @{ @@ -7240,14 +7225,16 @@ const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions( * Use rd_kafka_ConsumerGroupDescription_destroy() to free when done. */ static rd_kafka_ConsumerGroupDescription_t * -rd_kafka_ConsumerGroupDescription_new(const char *group_id, - rd_bool_t is_simple_consumer_group, - const rd_list_t *members, - const char *partition_assignor, - const rd_list_t *authorized_operations, - rd_kafka_consumer_group_state_t state, - const rd_kafka_Node_t *coordinator, - rd_kafka_error_t *error) { +rd_kafka_ConsumerGroupDescription_new( + const char *group_id, + rd_bool_t is_simple_consumer_group, + const rd_list_t *members, + const char *partition_assignor, + const rd_kafka_AclOperation_t *authorized_operations, + size_t authorized_operations_cnt, + rd_kafka_consumer_group_state_t state, + const rd_kafka_Node_t *coordinator, + rd_kafka_error_t *error) { rd_kafka_ConsumerGroupDescription_t *grpdesc; grpdesc = rd_calloc(1, sizeof(*grpdesc)); grpdesc->group_id = rd_strdup(group_id); @@ -7264,14 +7251,14 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id, ? (char *)partition_assignor : rd_strdup(partition_assignor); - if (authorized_operations == NULL) - rd_list_init(&grpdesc->authorized_operations, 0, rd_free); - else { - rd_list_init_copy(&grpdesc->authorized_operations, - authorized_operations); - rd_list_copy_to(&grpdesc->authorized_operations, - authorized_operations, - rd_kafka_AclOperation_copy, NULL); + grpdesc->authorized_operations_cnt = authorized_operations_cnt; + if (authorized_operations_cnt) { + grpdesc->authorized_operations = + rd_malloc(sizeof(rd_kafka_AclOperation_t) * + grpdesc->authorized_operations_cnt); + memcpy(grpdesc->authorized_operations, authorized_operations, + sizeof(rd_kafka_AclOperation_t) * + authorized_operations_cnt); } grpdesc->state = state; @@ -7296,7 +7283,7 @@ static rd_kafka_ConsumerGroupDescription_t * rd_kafka_ConsumerGroupDescription_new_error(const char *group_id, rd_kafka_error_t *error) { return rd_kafka_ConsumerGroupDescription_new( - group_id, rd_false, NULL, NULL, NULL, + group_id, rd_false, NULL, NULL, NULL, 0, RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error); } @@ -7312,8 +7299,8 @@ rd_kafka_ConsumerGroupDescription_copy( return rd_kafka_ConsumerGroupDescription_new( grpdesc->group_id, grpdesc->is_simple_consumer_group, &grpdesc->members, grpdesc->partition_assignor, - &grpdesc->authorized_operations, grpdesc->state, - grpdesc->coordinator, grpdesc->error); + grpdesc->authorized_operations, grpdesc->authorized_operations_cnt, + grpdesc->state, grpdesc->coordinator, grpdesc->error); } /** @@ -7336,7 +7323,8 @@ static void rd_kafka_ConsumerGroupDescription_destroy( rd_kafka_error_destroy(grpdesc->error); if (grpdesc->coordinator) rd_kafka_Node_destroy(grpdesc->coordinator); - rd_list_destroy(&grpdesc->authorized_operations); + if (grpdesc->authorized_operations_cnt) + rd_free(grpdesc->authorized_operations); rd_free(grpdesc); } @@ -7366,17 +7354,12 @@ const char *rd_kafka_ConsumerGroupDescription_partition_assignor( return grpdesc->partition_assignor; } -size_t rd_kafka_ConsumerGroupDescription_authorized_operation_count( - const rd_kafka_ConsumerGroupDescription_t *grpdesc) { - return rd_list_cnt(&grpdesc->authorized_operations); -} - -rd_kafka_AclOperation_t rd_kafka_ConsumerGroupDescription_authorized_operation( +const rd_kafka_AclOperation_t * +rd_kafka_ConsumerGroupDescription_authorized_operations( const rd_kafka_ConsumerGroupDescription_t *grpdesc, - size_t idx) { - rd_kafka_AclOperation_t *entry = - rd_list_elem(&grpdesc->authorized_operations, idx); - return *entry; + size_t *cntp) { + *cntp = grpdesc->authorized_operations_cnt; + return grpdesc->authorized_operations; } rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupDescription_state( @@ -7674,9 +7657,27 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req, } if (error == NULL) { + rd_kafka_AclOperation_t *authorized_operations = NULL; + size_t authorized_operations_cnt = 0; + + if (authorized_operations_list) { + int i; + authorized_operations_cnt = + rd_list_cnt(authorized_operations_list); + rd_kafka_AclOperation_t *acl_operation; + authorized_operations = + rd_alloca(sizeof(rd_kafka_AclOperation_t) * + authorized_operations_cnt); + RD_LIST_FOREACH(acl_operation, + authorized_operations_list, i) { + authorized_operations[i] = + *acl_operation; + } + } + grpdesc = rd_kafka_ConsumerGroupDescription_new( group_id, is_simple_consumer_group, &members, proto, - authorized_operations_list, + authorized_operations, authorized_operations_cnt, rd_kafka_consumer_group_state_code(group_state), node, error); } else @@ -7938,7 +7939,7 @@ static rd_kafka_TopicPartitionInfo_t *rd_kafka_TopicPartitionInfo_new( */ static void rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { - int i; + size_t i; RD_IF_FREE(pinfo->leader, rd_kafka_Node_destroy); for (i = 0; i < pinfo->isr_cnt; i++) @@ -7958,7 +7959,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { * @param topic topic name * @param partitions Array of partition metadata (rd_kafka_metadata_partition). * @param partition_cnt Number of partitions in partition metadata. - * @param topic_authorized_operations acl operations allowed for topic. + * @param authorized_operations acl operations allowed for topic. * @param error Topic error reported by the broker. * @return A newly allocated TopicDescription object. * Use rd_kafka_TopicDescription_destroy() to free when done. @@ -7970,7 +7971,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const struct rd_kafka_metadata_broker *brokers, const rd_kafka_metadata_broker_internal_t *brokers_internal, int broker_cnt, - const rd_list_t *topic_authorized_operations, + const rd_list_t *authorized_operations, rd_bool_t is_internal, rd_kafka_error_t *error) { rd_kafka_TopicDescription_t *topicdesc; @@ -7982,14 +7983,14 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( if (error) topicdesc->error = rd_kafka_error_copy(error); - if (topic_authorized_operations) { + if (authorized_operations) { rd_kafka_AclOperation_t *acl_operation; topicdesc->authorized_operations_cnt = - rd_list_cnt(topic_authorized_operations); + rd_list_cnt(authorized_operations); topicdesc->authorized_operations = rd_malloc(sizeof(rd_kafka_AclOperation_t) * topicdesc->authorized_operations_cnt); - RD_LIST_FOREACH(acl_operation, topic_authorized_operations, i) { + RD_LIST_FOREACH(acl_operation, authorized_operations, i) { topicdesc->authorized_operations[i] = *acl_operation; } } @@ -8031,6 +8032,7 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) { for (i = 0; i < topicdesc->partition_cnt; i++) rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]); + rd_free(topicdesc->partitions); rd_free(topicdesc); } @@ -8054,21 +8056,21 @@ const rd_kafka_Node_t ** rd_kafka_TopicPartitionInfo_isr(const rd_kafka_TopicPartitionInfo_t *partition, size_t *cntp) { *cntp = partition->isr_cnt; - return partition->isr; + return (const rd_kafka_Node_t **) partition->isr; } const rd_kafka_Node_t **rd_kafka_TopicPartitionInfo_replicas( const rd_kafka_TopicPartitionInfo_t *partition, size_t *cntp) { *cntp = partition->replica_cnt; - return partition->replicas; + return (const rd_kafka_Node_t **)partition->replicas; } const rd_kafka_TopicPartitionInfo_t **rd_kafka_TopicDescription_partitions( const rd_kafka_TopicDescription_t *topicdesc, size_t *cntp) { *cntp = topicdesc->partition_cnt; - return topicdesc->partitions; + return (const rd_kafka_TopicPartitionInfo_t**) topicdesc->partitions; } const rd_kafka_AclOperation_t *rd_kafka_TopicDescription_authorized_operations( @@ -8143,7 +8145,7 @@ rd_kafka_admin_DescribeTopicsRequest(rd_kafka_broker_t *rkb, rkb, topics, "describe topics", rd_false /* include_topic_authorized_operations */, include_topic_authorized_operations, rd_false /* force_racks */, - resp_cb, opaque); + resp_cb, replyq, opaque); if (err) { rd_snprintf(errstr, errstr_size, "%s", rd_kafka_err2str(err)); @@ -8202,6 +8204,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, } rd_list_add(&rko_result->rko_u.admin_result.results, topicdesc); } + rd_free(mdi); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; @@ -8280,41 +8283,7 @@ void rd_kafka_DescribeTopics(rd_kafka_t *rk, * */ -const rd_kafka_Node_t *rd_kafka_ClusterDescription_node( - const rd_kafka_ClusterDescription_t *clusterdesc, - int idx) { - return (rd_kafka_Node_t *)rd_list_elem(&clusterdesc->nodes, idx); -} - -const rd_kafka_AclOperation_t rd_kafka_ClusterDescription_authorized_operation( - const rd_kafka_ClusterDescription_t *clusterdesc, - size_t idx) { - rd_kafka_AclOperation_t *entry = - rd_list_elem(&clusterdesc->authorized_operations, idx); - return *entry; -} - -const char *rd_kafka_ClusterDescription_cluster_id( - const rd_kafka_ClusterDescription_t *clusterdesc) { - return clusterdesc->cluster_id; -} - -const int rd_kafka_ClusterDescription_controller_id( - const rd_kafka_ClusterDescription_t *clusterdesc) { - return clusterdesc->controller_id; -} - -const int rd_kafka_ClusterDescription_cluster_authorized_operation_count( - const rd_kafka_ClusterDescription_t *clusterdesc) { - return rd_list_cnt(&clusterdesc->authorized_operations); -} - -const int rd_kafka_ClusterDescription_node_count( - const rd_kafka_ClusterDescription_t *clusterdesc) { - return rd_list_cnt(&clusterdesc->nodes); -} - -const rd_kafka_ClusterDescription_t * +static const rd_kafka_ClusterDescription_t * rd_kafka_DescribeCluster_result_description( const rd_kafka_DescribeTopics_result_t *result) { int cluster_result_cnt; @@ -8331,6 +8300,38 @@ rd_kafka_DescribeCluster_result_description( return clusterdesc; } + +const rd_kafka_Node_t **rd_kafka_DescribeCluster_result_nodes( + const rd_kafka_DescribeTopics_result_t *result, + size_t *cntp) { + const rd_kafka_ClusterDescription_t *clusterdesc = + rd_kafka_DescribeCluster_result_description(result); + *cntp = clusterdesc->node_cnt; + return (const rd_kafka_Node_t **)clusterdesc->nodes; +} + +const rd_kafka_AclOperation_t * +rd_kafka_DescribeCluster_result_authorized_operations( + const rd_kafka_DescribeTopics_result_t *result, + size_t *cntp) { + const rd_kafka_ClusterDescription_t *clusterdesc = + rd_kafka_DescribeCluster_result_description(result); + *cntp = clusterdesc->authorized_operations_cnt; + return clusterdesc->authorized_operations; +} + +const char *rd_kafka_DescribeCluster_result_cluster_id( + const rd_kafka_DescribeTopics_result_t *result) { + return rd_kafka_DescribeCluster_result_description(result)->cluster_id; +} + +const rd_kafka_Node_t* rd_kafka_DescribeCluster_result_controller( + const rd_kafka_DescribeTopics_result_t *result) { + return + rd_kafka_DescribeCluster_result_description(result) + ->controller; +} + /** * @brief Create a new ClusterDescription object. * @@ -8349,36 +8350,52 @@ rd_kafka_ClusterDescription_new(const rd_kafka_metadata_internal_t *mdi) { mdi->cluster_authorized_operations); int i; - clusterdesc->cluster_id = rd_strdup(mdi->cluster_id); - clusterdesc->controller_id = mdi->controller_id; + clusterdesc->cluster_id = rd_strdup(mdi->cluster_id); - if (authorized_operations == NULL) - rd_list_init(&clusterdesc->authorized_operations, 0, rd_free); - else { - rd_list_init_copy(&clusterdesc->authorized_operations, - authorized_operations); - rd_list_copy_to(&clusterdesc->authorized_operations, - authorized_operations, - rd_kafka_AclOperation_copy, NULL); - } + if (mdi->controller_id >= 0) + clusterdesc->controller = rd_kafka_Node_new_from_brokers( + mdi->controller_id, md->brokers, mdi->brokers, + md->broker_cnt); - rd_list_init(&clusterdesc->nodes, 0, rd_kafka_Node_free); - for (i = 0; i < md->broker_cnt; i++) { - rd_kafka_Node_t *node = rd_kafka_Node_new( - md->brokers[i].id, rd_strdup(md->brokers[i].host), - md->brokers[i].port, NULL); - rd_list_add(&clusterdesc->nodes, node); + if (authorized_operations) { + rd_kafka_AclOperation_t *acl_operation; + clusterdesc->authorized_operations_cnt = + rd_list_cnt(authorized_operations); + clusterdesc->authorized_operations = + rd_malloc(sizeof(rd_kafka_AclOperation_t) * + clusterdesc->authorized_operations_cnt); + RD_LIST_FOREACH(acl_operation, authorized_operations, i) { + clusterdesc->authorized_operations[i] = *acl_operation; + } } - RD_IF_FREE(authorized_operations, rd_list_destroy); + + clusterdesc->node_cnt = md->broker_cnt; + clusterdesc->nodes = + rd_calloc(clusterdesc->node_cnt, sizeof(rd_kafka_Node_t *)); + + for (i = 0; i < md->broker_cnt; i++) + clusterdesc->nodes[i] = + rd_kafka_Node_new(md->brokers[i].id, md->brokers[i].host, + md->brokers[i].port, NULL); + return clusterdesc; } static void rd_kafka_ClusterDescription_destroy( rd_kafka_ClusterDescription_t *clusterdesc) { RD_IF_FREE(clusterdesc->cluster_id, rd_free); - rd_list_destroy(&clusterdesc->authorized_operations); - rd_list_destroy(&clusterdesc->nodes); + RD_IF_FREE(clusterdesc->controller, rd_kafka_Node_free); + + if (clusterdesc->authorized_operations_cnt) + rd_free(clusterdesc->authorized_operations); + + if (clusterdesc->node_cnt) { + size_t i; + for (i = 0; i < clusterdesc->node_cnt; i++) + rd_kafka_Node_free(clusterdesc->nodes[i]); + rd_free(clusterdesc->nodes); + } rd_free(clusterdesc); } @@ -8405,7 +8422,7 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeClusterRequest( rkb, NULL /* topics */, "describe cluster", include_cluster_authorized_operations, rd_false /* include_topic_authorized_operations */, - rd_false /* force_racks */, resp_cb, opaque); + rd_false /* force_racks */, resp_cb, replyq, opaque); if (err) { rd_snprintf(errstr, errstr_size, "%s", rd_kafka_err2str(err)); @@ -8441,6 +8458,8 @@ rd_kafka_DescribeClusterResponse_parse(rd_kafka_op_t *rko_req, clusterdesc = rd_kafka_ClusterDescription_new(mdi); + rd_free(mdi); + rd_list_add(&rko_result->rko_u.admin_result.results, clusterdesc); *rko_resultp = rko_result; return RD_KAFKA_RESP_ERR_NO_ERROR; diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index d488e6963b..b5dfa2970c 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -481,9 +481,10 @@ struct rd_kafka_ConsumerGroupDescription_s { rd_kafka_consumer_group_state_t state; /** Consumer group coordinator. */ rd_kafka_Node_t *coordinator; - /** List of authorized operations allowed for group. - * Type: rd_kafka_AclOperation_t* */ - rd_list_t authorized_operations; + /** Count of operations allowed for topic.*/ + size_t authorized_operations_cnt; + /** Operations allowed for topic. */ + rd_kafka_AclOperation_t *authorized_operations; /** Group specific error. */ rd_kafka_error_t *error; }; @@ -539,17 +540,19 @@ struct rd_kafka_TopicDescription_s { * @{ */ /** - * @struct DescribeCluster result + * @struct DescribeCluster result - internal type. */ -struct rd_kafka_ClusterDescription_s { - char *cluster_id; /**< current cluster id in \p cluster*/ - int controller_id; /**< current controller id in \p cluster*/ - rd_list_t - nodes; /**< Brokers in the cluster. Type: (rd_kafka_Node_t *) */ - rd_list_t - authorized_operations; /**< Operations allowed for cluster. - Type: (rd_kafka_AclOperation_t *) */ -}; +typedef struct rd_kafka_ClusterDescription_s { + char *cluster_id; /**< Cluster id */ + rd_kafka_Node_t *controller; /**< Current controller. */ + size_t node_cnt; /**< Count of brokers in the cluster. */ + rd_kafka_Node_t **nodes; /**< Brokers in the cluster. */ + size_t authorized_operations_cnt; /**< Count of operations allowed for + cluster.*/ + rd_kafka_AclOperation_t + *authorized_operations; /**< Operations allowed for cluster. */ + +} rd_kafka_ClusterDescription_t; /**@}*/ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 13ef836ef6..f45d83b07a 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2203,6 +2203,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, rd_bool_t force_racks, rd_kafka_op_t *rko, rd_kafka_resp_cb_t *resp_cb, + rd_kafka_replyq_t replyq, rd_bool_t force, void *opaque) { rd_kafka_buf_t *rkbuf; @@ -2213,6 +2214,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, int *full_incr = NULL; void *handler_arg; rd_kafka_resp_cb_t *handler_cb = rd_kafka_handle_Metadata; + rd_kafka_replyq_t use_replyq = replyq; ApiVersion = rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_Metadata, 0, 12, &features); @@ -2377,12 +2379,15 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, else handler_arg = rko; + /* If a custom replyq is provided (and is valid), the response is + * handled through on that replyq. By default, response is handled on + * rk_ops, and the default handler (rd_kafka_handle_Metadata) forwards + * the parsed result to rko's replyq when done. */ + if (!use_replyq.q) + use_replyq = RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0); + rd_kafka_broker_buf_enq_replyq( - rkb, rkbuf, - /* Handle response thru rk_ops, - * but forward parsed result to - * rko's replyq when done. */ - RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0), + rkb, rkbuf, use_replyq, /* The default response handler is rd_kafka_handle_Metadata, but we allow alternate handlers to be configured. */ handler_cb, handler_arg); @@ -2433,6 +2438,9 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, /* In all other situations apart from admin ops, we use rd_kafka_handle_Metadata rather than a custom resp_cb */ NULL, + /* Use default replyq which works with the default handler + rd_kafka_handle_Metadata. */ + RD_KAFKA_NO_REPLYQ, /* If the request needs to be forced, rko_u.metadata.force will be set. */ rd_false, NULL); @@ -2458,6 +2466,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, * @param force_racks - Force partition to rack mapping computation in * parse_Metadata (see comment there). * @param resp_cb - callback to be used for handling response. + * @param replyq - replyq on which response is handled. * @param opaque - (optional) parameter to be passed to resp_cb. */ rd_kafka_resp_err_t @@ -2468,6 +2477,7 @@ rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb, rd_bool_t include_topic_authorized_operations, rd_bool_t force_racks, rd_kafka_resp_cb_t *resp_cb, + rd_kafka_replyq_t replyq, void *opaque) { return rd_kafka_MetadataRequest0( rkb, topics, reason, @@ -2476,6 +2486,7 @@ rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb, include_topic_authorized_operations, rd_false /* No admin operation should update cgrp. */, force_racks, NULL /* Admin options don't require us to track the op. */, resp_cb, + replyq, rd_true /* Admin operation metadata requests are always forced. */, opaque); } diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 1e97fb84d9..84f168610a 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -272,6 +272,7 @@ rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb, rd_bool_t include_topic_authorized_operations, rd_bool_t force_racks, rd_kafka_resp_cb_t *resp_cb, + rd_kafka_replyq_t replyq, void *opaque); rd_kafka_resp_err_t diff --git a/tests/0080-admin_ut.c b/tests/0080-admin_ut.c index cebbf9c8e6..06f91858a8 100644 --- a/tests/0080-admin_ut.c +++ b/tests/0080-admin_ut.c @@ -723,6 +723,7 @@ static void do_test_DescribeConsumerGroups(const char *what, /* The returned groups should be in the original order, and * should all have timed out. */ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + size_t authorized_operation_cnt; TEST_ASSERT( !strcmp(group_names[i], rd_kafka_ConsumerGroupDescription_group_id( @@ -737,11 +738,12 @@ static void do_test_DescribeConsumerGroups(const char *what, group_names[i], rd_kafka_error_string( rd_kafka_ConsumerGroupDescription_error(resgroups[i]))); - TEST_ASSERT( - rd_kafka_ConsumerGroupDescription_authorized_operation_count( - resgroups[i]) == 0, - "Got authorized operations" - "when not requested"); + + rd_kafka_ConsumerGroupDescription_authorized_operations( + resgroups[i], &authorized_operation_cnt); + TEST_ASSERT(authorized_operation_cnt == 0, + "Got authorized operations" + "when not requested"); } rd_kafka_event_destroy(rkev); diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 6a2288006b..965c319f58 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -2866,6 +2866,7 @@ static void do_test_DescribeConsumerGroups(const char *what, char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; const rd_kafka_DescribeConsumerGroups_result_t *res; + size_t authorized_operation_cnt; rd_bool_t has_group_instance_id = test_broker_version >= TEST_BRKVER(2, 4, 0, 0); @@ -2986,9 +2987,10 @@ static void do_test_DescribeConsumerGroups(const char *what, rd_kafka_ConsumerGroupDescription_error(act)); rd_kafka_consumer_group_state_t state = rd_kafka_ConsumerGroupDescription_state(act); + rd_kafka_ConsumerGroupDescription_authorized_operations( + act, &authorized_operation_cnt); TEST_ASSERT( - rd_kafka_ConsumerGroupDescription_authorized_operation_count( - act) == 0, + authorized_operation_cnt == 0, "Authorized operations returned when not requested\n"); TEST_ASSERT( strcmp(exp->group_id, @@ -3112,45 +3114,13 @@ static void do_test_DescribeConsumerGroups(const char *what, SUB_TEST_PASS(); } -/* Helper macro to check if an authorized operation `acl_to_check` is contained - * within the `acl_struct`, for which the acl at index i is accessed with - * `idx_fn(acl_struct, i)`.*/ -#define test_match_authorized_operation(acl_cnt, acl_struct, idx_fn, \ - acl_to_check) \ - do { \ - int i; \ - rd_bool_t found = rd_false; \ - for (i = 0; i < acl_cnt; i++) { \ - if (idx_fn(acl_struct, i) == acl_to_check) { \ - found = rd_true; \ - break; \ - } \ - } \ - if (!found) \ - TEST_FAIL("Expected to find operation %s in result\n", \ - rd_kafka_AclOperation_name(acl_to_check)); \ - } while (0) - -/* Helper macro to check whether all the authorized operations in the var-args - * are present within the `acl_struct`. */ -#define test_match_authorized_operations(acl_cnt, acl_struct, idx_fn, \ - acl_to_check_cnt, ...) \ - do { \ - rd_kafka_AclOperation_t operations[acl_to_check_cnt] = { \ - __VA_ARGS__}; \ - size_t i; \ - for (i = 0; i < acl_to_check_cnt; i++) { \ - rd_kafka_AclOperation_t operation = operations[i]; \ - test_match_authorized_operation(acl_cnt, acl_struct, \ - idx_fn, operation); \ - } \ - } while (0) - +/** @brief Helper function to check whether \p expected and \p actual contain + * the same values. */ static void -xtest_match_authorized_operations(const rd_kafka_AclOperation_t *expected, - size_t expected_cnt, - const rd_kafka_AclOperation_t *actual, - size_t actual_cnt) { +test_match_authorized_operations(const rd_kafka_AclOperation_t *expected, + size_t expected_cnt, + const rd_kafka_AclOperation_t *actual, + size_t actual_cnt) { size_t i, j; TEST_ASSERT(expected_cnt == actual_cnt, "Expected %" PRIusz " authorized operations, got %" PRIusz, @@ -3311,9 +3281,9 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_TopicDescription_authorized_operations( result_topics[0], &authorized_operations_cnt); - xtest_match_authorized_operations(expected, 8, - authorized_operations, - authorized_operations_cnt); + test_match_authorized_operations(expected, 8, + authorized_operations, + authorized_operations_cnt); } rd_kafka_event_destroy(rkev); @@ -3397,9 +3367,9 @@ static void do_test_DescribeTopics(const char *what, rd_kafka_TopicDescription_authorized_operations( result_topics[0], &authorized_operations_cnt); - xtest_match_authorized_operations(expected, 2, - authorized_operations, - authorized_operations_cnt); + test_match_authorized_operations(expected, 2, + authorized_operations, + authorized_operations_cnt); } rd_kafka_event_destroy(rkev); @@ -3448,16 +3418,17 @@ static void do_test_DescribeCluster(const char *what, rd_kafka_resp_err_t err; test_timing_t timing; const rd_kafka_DescribeCluster_result_t *res; - const rd_kafka_ClusterDescription_t *result_cluster; + const rd_kafka_Node_t **nodes; + size_t node_cnt; char errstr[128]; const char *errstr2; rd_kafka_AclBinding_t *acl_bindings[1]; rd_kafka_AclBindingFilter_t *acl_bindings_delete; - int authorized_operations_cnt; + const rd_kafka_AclOperation_t *authorized_operations; + size_t authorized_operations_cnt; const char *sasl_username; const char *sasl_mechanism; const char *principal; - const rd_kafka_Node_t *node; SUB_TEST_QUICK( "%s DescribeCluster with %s, request_timeout %d, %s authorized " @@ -3496,40 +3467,35 @@ static void do_test_DescribeCluster(const char *what, TEST_ASSERT(!err, "Expected success, not %s: %s", rd_kafka_err2name(err), errstr2); - result_cluster = rd_kafka_DescribeCluster_result_description(res); - /* Sanity checks on fields inside the result. There's not much we can * say here deterministically, since it depends on the test environment. */ - TEST_ASSERT( - strlen(rd_kafka_ClusterDescription_cluster_id(result_cluster)), - "Length of cluster id should be non-null."); - TEST_ASSERT(rd_kafka_ClusterDescription_node_count(result_cluster), - "Expected non-zero node count in cluster."); - node = rd_kafka_ClusterDescription_node(result_cluster, 0); - TEST_ASSERT(rd_kafka_Node_host(node), + TEST_ASSERT(strlen(rd_kafka_DescribeCluster_result_cluster_id(res)), + "Length of cluster id should be non-null."); + + nodes = rd_kafka_DescribeCluster_result_nodes(res, &node_cnt); + TEST_ASSERT(node_cnt, "Expected non-zero node count for cluster."); + + TEST_ASSERT(rd_kafka_Node_host(nodes[0]), "Expected first node of cluster to have a hostname"); - TEST_ASSERT(rd_kafka_Node_port(node), + TEST_ASSERT(rd_kafka_Node_port(nodes[0]), "Expected first node of cluster to have a port"); if (include_authorized_operations) { - authorized_operations_cnt = - rd_kafka_ClusterDescription_cluster_authorized_operation_count( - result_cluster); - TEST_ASSERT( - authorized_operations_cnt == 7, - "Expected 7 cluster operations allowed by ACLs, actual %d", - authorized_operations_cnt); - test_match_authorized_operations( - authorized_operations_cnt, result_cluster, - rd_kafka_ClusterDescription_authorized_operation, 7, + const rd_kafka_AclOperation_t expected[] = { RD_KAFKA_ACL_OPERATION_ALTER, RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS, RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION, RD_KAFKA_ACL_OPERATION_CREATE, RD_KAFKA_ACL_OPERATION_DESCRIBE, RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS, - RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE); + RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE}; + authorized_operations = + rd_kafka_DescribeCluster_result_authorized_operations( + res, &authorized_operations_cnt); + test_match_authorized_operations(expected, 7, + authorized_operations, + authorized_operations_cnt); } rd_kafka_event_destroy(rkev); @@ -3590,24 +3556,23 @@ static void do_test_DescribeCluster(const char *what, TEST_ASSERT(!err, "Expected success, not %s: %s", rd_kafka_err2name(err), errstr2); - result_cluster = rd_kafka_DescribeCluster_result_description(res); - - authorized_operations_cnt = - rd_kafka_ClusterDescription_cluster_authorized_operation_count( - result_cluster); - /* * After CreateAcls call with * only RD_KAFKA_ACL_OPERATION_ALTER allowed, the allowed operations * should be 2 (DESCRIBE is implicitly derived from ALTER). */ - TEST_ASSERT(authorized_operations_cnt == 2, - "Expected 2 cluster operations allowed by ACLs, actual %d", - authorized_operations_cnt); - test_match_authorized_operations( - authorized_operations_cnt, result_cluster, - rd_kafka_ClusterDescription_authorized_operation, 2, - RD_KAFKA_ACL_OPERATION_ALTER, RD_KAFKA_ACL_OPERATION_DESCRIBE); + { + const rd_kafka_AclOperation_t expected[] = { + RD_KAFKA_ACL_OPERATION_ALTER, + RD_KAFKA_ACL_OPERATION_DESCRIBE}; + authorized_operations = + rd_kafka_DescribeCluster_result_authorized_operations( + res, &authorized_operations_cnt); + + test_match_authorized_operations(expected, 2, + authorized_operations, + authorized_operations_cnt); + } rd_kafka_event_destroy(rkev); @@ -3658,7 +3623,8 @@ do_test_DescribeConsumerGroups_with_authorized_ops(const char *what, size_t results_cnt; const rd_kafka_DescribeConsumerGroups_result_t *res; const char *principal, *sasl_mechanism, *sasl_username; - int authorized_operations_cnt; + const rd_kafka_AclOperation_t *authorized_operations; + size_t authorized_operations_cnt; SUB_TEST_QUICK("%s DescribeConsumerGroups with %s, request_timeout %d", rd_kafka_name(rk), what, request_timeout); @@ -3727,17 +3693,18 @@ do_test_DescribeConsumerGroups_with_authorized_ops(const char *what, TEST_ASSERT(!error, "Expected no error in describing group, got: %s", rd_kafka_error_string(error)); - authorized_operations_cnt = - rd_kafka_ConsumerGroupDescription_authorized_operation_count( - results[0]); - TEST_ASSERT(authorized_operations_cnt == 3, - "Expected 2 group operations allowed by ACLs, actual %d", - authorized_operations_cnt); - test_match_authorized_operations( - authorized_operations_cnt, results[0], - rd_kafka_ConsumerGroupDescription_authorized_operation, 3, - RD_KAFKA_ACL_OPERATION_DELETE, RD_KAFKA_ACL_OPERATION_DESCRIBE, - RD_KAFKA_ACL_OPERATION_READ); + { + const rd_kafka_AclOperation_t expected[] = { + RD_KAFKA_ACL_OPERATION_DELETE, + RD_KAFKA_ACL_OPERATION_DESCRIBE, + RD_KAFKA_ACL_OPERATION_READ}; + authorized_operations = + rd_kafka_ConsumerGroupDescription_authorized_operations( + results[0], &authorized_operations_cnt); + test_match_authorized_operations(expected, 3, + authorized_operations, + authorized_operations_cnt); + } rd_kafka_event_destroy(rkev); @@ -3792,16 +3759,18 @@ do_test_DescribeConsumerGroups_with_authorized_ops(const char *what, TEST_ASSERT(!error, "Expected no error in describing group, got: %s", rd_kafka_error_string(error)); - authorized_operations_cnt = - rd_kafka_ConsumerGroupDescription_authorized_operation_count( - results[0]); - TEST_ASSERT(authorized_operations_cnt == 2, - "Expected 2 group operations allowed by ACLs, actual %d", - authorized_operations_cnt); - test_match_authorized_operations( - authorized_operations_cnt, results[0], - rd_kafka_ConsumerGroupDescription_authorized_operation, 2, - RD_KAFKA_ACL_OPERATION_DESCRIBE, RD_KAFKA_ACL_OPERATION_READ); + + { + const rd_kafka_AclOperation_t expected[] = { + RD_KAFKA_ACL_OPERATION_DESCRIBE, + RD_KAFKA_ACL_OPERATION_READ}; + authorized_operations = + rd_kafka_ConsumerGroupDescription_authorized_operations( + results[0], &authorized_operations_cnt); + test_match_authorized_operations(expected, 2, + authorized_operations, + authorized_operations_cnt); + } rd_kafka_event_destroy(rkev);