Skip to content

Commit

Permalink
Address DescribeCluster/DescribeConsumerGroup changes, fix memleak
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 31, 2023
1 parent 59ab5bb commit 53cd983
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 357 deletions.
39 changes: 18 additions & 21 deletions examples/describe_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,34 +141,31 @@ int64_t parse_int(const char *what, const char *str) {
*/
static int
print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
int j, acl_operation;
const rd_kafka_ClusterDescription_t *desc;
int controller_id, node_cnt, cluster_authorized_operations_cnt;
const char *cluster_id;

desc = rd_kafka_DescribeCluster_result_description(clusterdesc);

controller_id = rd_kafka_ClusterDescription_controller_id(desc);
node_cnt = rd_kafka_ClusterDescription_node_count(desc);
cluster_authorized_operations_cnt =
rd_kafka_ClusterDescription_cluster_authorized_operation_count(
desc);
cluster_id = rd_kafka_ClusterDescription_cluster_id(desc);
size_t j;
size_t node_cnt;
size_t authorized_operations_cnt;
const char *cluster_id =
rd_kafka_DescribeCluster_result_cluster_id(clusterdesc);
const rd_kafka_Node_t **nodes =
rd_kafka_DescribeCluster_result_nodes(clusterdesc, &node_cnt);
const rd_kafka_AclOperation_t *authorized_operations =
rd_kafka_DescribeCluster_result_authorized_operations(
clusterdesc, &authorized_operations_cnt);
const rd_kafka_Node_t *controller =
rd_kafka_DescribeCluster_result_controller(clusterdesc);

printf(
"Cluster id: %s\t Controller id: %d\t ACL operations count "
"Cluster id: %s\t Controller id: %d\t authorized operations count "
"allowed: %d\n",
cluster_id, controller_id, cluster_authorized_operations_cnt);
for (j = 0; j < cluster_authorized_operations_cnt; j++) {
acl_operation =
rd_kafka_ClusterDescription_authorized_operation(desc, j);
cluster_id, controller ? rd_kafka_Node_id(controller) : -1, (int)authorized_operations_cnt);

for (j = 0; j < authorized_operations_cnt; j++) {
printf("\t%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
rd_kafka_AclOperation_name(authorized_operations[j]));
}

for (j = 0; j < node_cnt; j++) {
const rd_kafka_Node_t *node = NULL;
node = rd_kafka_ClusterDescription_node(desc, j);
const rd_kafka_Node_t *node = nodes[j];
printf("Node [id: %" PRId32
", host: %s"
", port: %" PRIu16 "]\n",
Expand Down
19 changes: 10 additions & 9 deletions examples/describe_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ print_group_member_info(const rd_kafka_MemberDescription_t *member) {
* @brief Print group information.
*/
static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
int j, member_cnt, authorized_operation_count, acl_operation;
int member_cnt;
size_t j;
size_t authorized_operations_cnt;
const rd_kafka_AclOperation_t *authorized_operations;
const rd_kafka_error_t *error;
char coordinator_desc[512];
const rd_kafka_Node_t *coordinator = NULL;
Expand All @@ -190,8 +193,9 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
rd_kafka_ConsumerGroupDescription_partition_assignor(group);
rd_kafka_consumer_group_state_t state =
rd_kafka_ConsumerGroupDescription_state(group);
authorized_operation_count =
rd_kafka_ConsumerGroupDescription_authorized_operation_count(group);
authorized_operations =
rd_kafka_ConsumerGroupDescription_authorized_operations(
group, &authorized_operations_cnt);
member_cnt = rd_kafka_ConsumerGroupDescription_member_count(group);
error = rd_kafka_ConsumerGroupDescription_error(group);
coordinator = rd_kafka_ConsumerGroupDescription_coordinator(group);
Expand All @@ -212,18 +216,15 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
group_id, partition_assignor,
rd_kafka_consumer_group_state_name(state), coordinator_desc,
member_cnt);
for (j = 0; j < authorized_operation_count; j++) {
acl_operation =
rd_kafka_ConsumerGroupDescription_authorized_operation(
group, j);
for (j = 0; j < authorized_operations_cnt; j++) {
printf("%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
rd_kafka_AclOperation_name(authorized_operations[j]));
}
if (error)
printf(" error[%" PRId32 "]: %s", rd_kafka_error_code(error),
rd_kafka_error_string(error));
printf("\n");
for (j = 0; j < member_cnt; j++) {
for (j = 0; j < (size_t)member_cnt; j++) {
const rd_kafka_MemberDescription_t *member =
rd_kafka_ConsumerGroupDescription_member(group, j);
print_group_member_info(member);
Expand Down
122 changes: 35 additions & 87 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -8205,12 +8205,6 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc);
* @{
*/


/**
* @brief DescribeCluster result type.
*/
typedef struct rd_kafka_ClusterDescription_s rd_kafka_ClusterDescription_t;

/**
* @brief Describes the cluster.
*
Expand All @@ -8229,96 +8223,59 @@ void rd_kafka_DescribeCluster(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu);

/**
* @brief Get the DescribeCluster result.
* @brief Gets the broker nodes for the \p result cluster.
*
* @param result Result to get cluster result from.
* @param result The result of DescribeCluster.
* @param cntp is updated with the count of broker nodes.
*
* @return An array of broker nodes.
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p result object.
*/
RD_EXPORT
const rd_kafka_ClusterDescription_t *
rd_kafka_DescribeCluster_result_description(
const rd_kafka_DescribeCluster_result_t *result);


/**
* @brief Gets the node count for the \p clusterdesc cluster.
*
* @param clusterdesc The cluster description.
*
* @return The node count.
*/
RD_EXPORT
const int rd_kafka_ClusterDescription_node_count(
const rd_kafka_ClusterDescription_t *clusterdesc);

/**
* @brief Gets the node for the \p clusterdesc cluster at \p idx position.
*
* @param clusterdesc The cluster description.
* @param idx the index at which to return the node.
*
* @return The node at idx position.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p clusterdesc object.
*/
RD_EXPORT
const rd_kafka_Node_t *rd_kafka_ClusterDescription_node(
const rd_kafka_ClusterDescription_t *clusterdesc,
int idx);
const rd_kafka_Node_t **rd_kafka_DescribeCluster_result_nodes(
const rd_kafka_DescribeTopics_result_t *result,
size_t *cntp);

/**
* @brief Gets the cluster authorized acl operations for the \p clusterdesc
* cluster.
* @brief Gets the authorized ACL operations for the \p result cluster.
*
* @param clusterdesc The cluster description.
* @param result The result of DescribeCluster.
* @param cntp is updated with authorized ACL operations count.
*
* @return The cluster authorized operations.
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p result object.
*/
RD_EXPORT
const int rd_kafka_ClusterDescription_cluster_authorized_operation_count(
const rd_kafka_ClusterDescription_t *clusterdesc);

/**
* @brief Gets operation at index \p idx of cluster authorized operations for
* the \p clusterdesc cluster.
*
* @param clusterdesc The cluster description.
* @param idx The index for which element is needed.
*
* @return Authorized operation at given index.
*/
RD_EXPORT
const rd_kafka_AclOperation_t rd_kafka_ClusterDescription_authorized_operation(
const rd_kafka_ClusterDescription_t *clusterdesc,
size_t idx);
const rd_kafka_AclOperation_t *
rd_kafka_DescribeCluster_result_authorized_operations(
const rd_kafka_DescribeTopics_result_t *result,
size_t *cntp);

/**
* @brief Gets the cluster current controller id for the \p clusterdesc cluster.
* @brief Gets the current controller for the \p result cluster.
*
* @param clusterdesc The cluster description.
* @param result The result of DescribeCluster.
*
* @return The cluster current controller id.
* @return The cluster current controller.
*/
RD_EXPORT
const int rd_kafka_ClusterDescription_controller_id(
const rd_kafka_ClusterDescription_t *clusterdesc);
const rd_kafka_Node_t* rd_kafka_DescribeCluster_result_controller(
const rd_kafka_DescribeTopics_result_t *result);

/**
* @brief Gets the cluster current cluster id for the \p clusterdesc cluster.
*
* @param clusterdesc The cluster description.
* @brief Gets the cluster id for the \p result cluster.
*
* @return The cluster current cluster id (char*).
* @param result The result of DescribeCluster.
*
* @return The cluster id.
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p clusterdesc object.
* as the lifetime of the \p result object.
*/
RD_EXPORT
const char *rd_kafka_ClusterDescription_cluster_id(
const rd_kafka_ClusterDescription_t *clusterdesc);
const char *rd_kafka_DescribeCluster_result_cluster_id(
const rd_kafka_DescribeTopics_result_t *result);

/**@}*/

Expand Down Expand Up @@ -8548,30 +8505,21 @@ const char *rd_kafka_ConsumerGroupDescription_partition_assignor(
const rd_kafka_ConsumerGroupDescription_t *grpdesc);

/**
* @brief Gets count of authorized operations for the \p grpdesc group.
* @brief Gets the authorized ACL operations for the \p grpdesc group.
*
* @param grpdesc The group description.
* @param cntp is updated with authorized ACL operations count.
*
* @return count of Authorized operations allowed, 0 if authorized operations
* list is NULL or empty.
*/
RD_EXPORT
size_t rd_kafka_ConsumerGroupDescription_authorized_operation_count(
const rd_kafka_ConsumerGroupDescription_t *grpdesc);

/**
* @brief Gets operation at index \p idx of authorized operations for the
* \p grpdesc group.
*
* @param grpdesc The group description.
* @param idx The index for which element is needed.
* @return The group authorized operations.
*
* @return Authorized operation at given index.
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p grpdesc object.
*/
RD_EXPORT
rd_kafka_AclOperation_t rd_kafka_ConsumerGroupDescription_authorized_operation(
const rd_kafka_AclOperation_t *
rd_kafka_ConsumerGroupDescription_authorized_operations(
const rd_kafka_ConsumerGroupDescription_t *grpdesc,
size_t idx);
size_t *cntp);

/**
* @brief Gets state for the \p grpdesc group.
Expand Down
Loading

0 comments on commit 53cd983

Please sign in to comment.