Skip to content

Commit

Permalink
Fix style and refactor MetadataRequest into op/cb
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Sep 4, 2023
1 parent 53cd983 commit bdd8a4b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 49 deletions.
3 changes: 2 additions & 1 deletion examples/describe_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
printf(
"Cluster id: %s\t Controller id: %d\t authorized operations count "
"allowed: %d\n",
cluster_id, controller ? rd_kafka_Node_id(controller) : -1, (int)authorized_operations_cnt);
cluster_id, controller ? rd_kafka_Node_id(controller) : -1,
(int)authorized_operations_cnt);

for (j = 0; j < authorized_operations_cnt; j++) {
printf("\t%s operation is allowed\n",
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -8261,7 +8261,7 @@ rd_kafka_DescribeCluster_result_authorized_operations(
* @return The cluster current controller.
*/
RD_EXPORT
const rd_kafka_Node_t* rd_kafka_DescribeCluster_result_controller(
const rd_kafka_Node_t *rd_kafka_DescribeCluster_result_controller(
const rd_kafka_DescribeTopics_result_t *result);

/**
Expand Down
42 changes: 34 additions & 8 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,34 @@ static rd_kafka_op_t *rd_kafka_admin_request_op_target_all_new(
return rko;
}


/**
* @brief Construct MetadataRequest for use with AdminAPI (does not send).
* Common for DescribeTopics and DescribeCluster.
*
* @sa rd_kafka_MetadataRequest_resp_cb.
*/
static rd_kafka_resp_err_t
rd_kafka_admin_MetadataRequest(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t include_cluster_authorized_operations,
rd_bool_t include_topic_authorized_operations,
rd_bool_t force_racks,
rd_kafka_resp_cb_t *resp_cb,
rd_kafka_replyq_t replyq,
void *opaque) {
return rd_kafka_MetadataRequest_resp_cb(
rkb, topics, reason,
rd_false /* No admin operation requires topic creation. */,
include_cluster_authorized_operations,
include_topic_authorized_operations,
rd_false /* No admin operation should update cgrp. */, force_racks,
resp_cb, replyq,
rd_true /* Admin operation metadata requests are always forced. */,
opaque);
}

/**@}*/


Expand Down Expand Up @@ -8056,7 +8084,7 @@ const rd_kafka_Node_t **
rd_kafka_TopicPartitionInfo_isr(const rd_kafka_TopicPartitionInfo_t *partition,
size_t *cntp) {
*cntp = partition->isr_cnt;
return (const rd_kafka_Node_t **) partition->isr;
return (const rd_kafka_Node_t **)partition->isr;
}

const rd_kafka_Node_t **rd_kafka_TopicPartitionInfo_replicas(
Expand All @@ -8070,7 +8098,7 @@ const rd_kafka_TopicPartitionInfo_t **rd_kafka_TopicDescription_partitions(
const rd_kafka_TopicDescription_t *topicdesc,
size_t *cntp) {
*cntp = topicdesc->partition_cnt;
return (const rd_kafka_TopicPartitionInfo_t**) topicdesc->partitions;
return (const rd_kafka_TopicPartitionInfo_t **)topicdesc->partitions;
}

const rd_kafka_AclOperation_t *rd_kafka_TopicDescription_authorized_operations(
Expand Down Expand Up @@ -8141,7 +8169,7 @@ rd_kafka_admin_DescribeTopicsRequest(rd_kafka_broker_t *rkb,
int include_topic_authorized_operations =
rd_kafka_confval_get_int(&options->include_authorized_operations);

err = rd_kafka_MetadataRequest_admin(
err = rd_kafka_admin_MetadataRequest(
rkb, topics, "describe topics",
rd_false /* include_topic_authorized_operations */,
include_topic_authorized_operations, rd_false /* force_racks */,
Expand Down Expand Up @@ -8325,11 +8353,9 @@ const char *rd_kafka_DescribeCluster_result_cluster_id(
return rd_kafka_DescribeCluster_result_description(result)->cluster_id;
}

const rd_kafka_Node_t* rd_kafka_DescribeCluster_result_controller(
const rd_kafka_Node_t *rd_kafka_DescribeCluster_result_controller(
const rd_kafka_DescribeTopics_result_t *result) {
return
rd_kafka_DescribeCluster_result_description(result)
->controller;
return rd_kafka_DescribeCluster_result_description(result)->controller;
}

/**
Expand Down Expand Up @@ -8418,7 +8444,7 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeClusterRequest(
int include_cluster_authorized_operations =
rd_kafka_confval_get_int(&options->include_authorized_operations);

err = rd_kafka_MetadataRequest_admin(
err = rd_kafka_admin_MetadataRequest(
rkb, NULL /* topics */, "describe cluster",
include_cluster_authorized_operations,
rd_false /* include_topic_authorized_operations */,
Expand Down
79 changes: 50 additions & 29 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2217,7 +2217,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
rd_kafka_replyq_t use_replyq = replyq;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Metadata, 0, 12, &features);
rkb, RD_KAFKAP_Metadata, 0, 10, &features);

rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
4 + (50 * topic_cnt) + 1,
Expand Down Expand Up @@ -2396,6 +2396,36 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
}


/**
* @brief Construct a MetadataRequest which uses an optional rko, and the
* default handler callback.
* @sa rd_kafka_MetadataRequest.
*/
static rd_kafka_resp_err_t
rd_kafka_MetadataRequest_op(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t allow_auto_create_topics,
rd_bool_t include_cluster_authorized_operations,
rd_bool_t include_topic_authorized_operations,
rd_bool_t cgrp_update,
rd_bool_t force_racks,
rd_kafka_op_t *rko) {
return rd_kafka_MetadataRequest0(
rkb, topics, reason, allow_auto_create_topics,
include_cluster_authorized_operations,
include_topic_authorized_operations, cgrp_update, force_racks, rko,
/* We use the default rd_kafka_handle_Metadata rather than a custom
resp_cb */
NULL,
/* Use default replyq which works with the default handler
rd_kafka_handle_Metadata. */
RD_KAFKA_NO_REPLYQ,
/* If the request needs to be forced, rko_u.metadata.force will be
set. We don't provide an explicit parameter force. */
rd_false, NULL);
}

/**
* @brief Construct MetadataRequest (does not send)
*
Expand Down Expand Up @@ -2430,20 +2460,11 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
rd_bool_t cgrp_update,
rd_bool_t force_racks,
rd_kafka_op_t *rko) {
return rd_kafka_MetadataRequest0(
return rd_kafka_MetadataRequest_op(
rkb, topics, reason, allow_auto_create_topics,
/* cluster and topic authorized operations are used by admin
operations only. */
rd_false, rd_false, cgrp_update, force_racks, rko,
/* In all other situations apart from admin ops, we use
rd_kafka_handle_Metadata rather than a custom resp_cb */
NULL,
/* Use default replyq which works with the default handler
rd_kafka_handle_Metadata. */
RD_KAFKA_NO_REPLYQ,
/* If the request needs to be forced, rko_u.metadata.force will be
set. */
rd_false, NULL);
rd_false, rd_false, cgrp_update, force_racks, rko);
}


Expand All @@ -2469,24 +2490,24 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
* @param replyq - replyq on which response is handled.
* @param opaque - (optional) parameter to be passed to resp_cb.
*/
rd_kafka_resp_err_t
rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t include_cluster_authorized_operations,
rd_bool_t include_topic_authorized_operations,
rd_bool_t force_racks,
rd_kafka_resp_cb_t *resp_cb,
rd_kafka_replyq_t replyq,
void *opaque) {
rd_kafka_resp_err_t rd_kafka_MetadataRequest_resp_cb(
rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t allow_auto_create_topics,
rd_bool_t include_cluster_authorized_operations,
rd_bool_t include_topic_authorized_operations,
rd_bool_t cgrp_update,
rd_bool_t force_racks,
rd_kafka_resp_cb_t *resp_cb,
rd_kafka_replyq_t replyq,
rd_bool_t force,
void *opaque) {
return rd_kafka_MetadataRequest0(
rkb, topics, reason,
/* No admin operation requires topic creation. */
rd_false, include_cluster_authorized_operations,
include_topic_authorized_operations,
rd_false /* No admin operation should update cgrp. */, force_racks,
NULL /* Admin options don't require us to track the op. */, resp_cb,
replyq,
rkb, topics, reason, allow_auto_create_topics,
include_cluster_authorized_operations,
include_topic_authorized_operations, cgrp_update, force_racks,
NULL /* No op - using custom resp_cb. */, resp_cb, replyq,
rd_true /* Admin operation metadata requests are always forced. */,
opaque);
}
Expand Down
23 changes: 13 additions & 10 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,19 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
rd_bool_t force_racks,
rd_kafka_op_t *rko);

rd_kafka_resp_err_t
rd_kafka_MetadataRequest_admin(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t include_cluster_authorized_operations,
rd_bool_t include_topic_authorized_operations,
rd_bool_t force_racks,
rd_kafka_resp_cb_t *resp_cb,
rd_kafka_replyq_t replyq,
void *opaque);
rd_kafka_resp_err_t rd_kafka_MetadataRequest_resp_cb(
rd_kafka_broker_t *rkb,
const rd_list_t *topics,
const char *reason,
rd_bool_t allow_auto_create_topics,
rd_bool_t include_cluster_authorized_operations,
rd_bool_t include_topic_authorized_operations,
rd_bool_t cgrp_update,
rd_bool_t force_racks,
rd_kafka_resp_cb_t *resp_cb,
rd_kafka_replyq_t replyq,
rd_bool_t force,
void *opaque);

rd_kafka_resp_err_t
rd_kafka_handle_ApiVersion(rd_kafka_t *rk,
Expand Down

0 comments on commit bdd8a4b

Please sign in to comment.