Skip to content

Commit

Permalink
Address review comments - Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 1, 2023
1 parent 3215a90 commit e01ad41
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 40 deletions.
29 changes: 17 additions & 12 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -6817,11 +6817,9 @@ typedef enum rd_kafka_admin_op_t {
RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS,
/** AlterUserScramCredentials */
RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS,
/**< DescribeTopics */
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS,
/**< DescribeCluster */
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER,
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
} rd_kafka_admin_op_t;

/**
Expand Down Expand Up @@ -7001,8 +6999,9 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets(
int true_or_false);

/**
* @brief Whether broker should return authorized operations
* (DescribeConsumerGroups, DescribeTopics, DescribeCluster).
* @brief Whether broker should return authorized operations for the given
* resource in the DescribeConsumerGroups, DescribeTopics, or
* DescribeCluster calls.
*
* @param options Admin options.
* @param true_or_false Defaults to false.
Expand Down Expand Up @@ -7999,6 +7998,8 @@ typedef struct rd_kafka_TopicDescription_s rd_kafka_TopicDescription_t;
* @param topics Array of topics to describe.
* @param topics_cnt Number of elements in \p topics array.
* @param options Optional admin options, or NULL for defaults.
* Valid options:
* - include_authorized_operations
* @param rkqu Queue to emit result on.
*
* @remark The result event type emitted on the supplied queue is of type
Expand Down Expand Up @@ -8080,8 +8081,8 @@ const int rd_kafka_TopicDescription_partition_isr_count(
int idx);

/**
* @brief Gets the partition in-sync replica at \p isr_idx
* for partition \p partition_idx for the \p topicdesc topic.
* @brief Gets the in-sync replica at index \p replica_idx for the partition
* at the index \p partition_idx for the topic \p topicdesc.
*
* @param topicdesc The topic description.
* @param partition_idx Index for the partitions.
Expand Down Expand Up @@ -8111,12 +8112,12 @@ const int rd_kafka_TopicDescription_partition_replica_count(


/**
* @brief Gets the partition replica at replica index
* for partition \p partition_idx for the \p topicdesc topic.
* @brief Gets the partition replica at index \p replica_idx for the partition
* at the index \p partition_idx for the topic \p topicdesc.
*
* @param topicdesc The topic description.
* @param partition_idx Index for the partitions.
* @param replica_idx Index for the in-sync replica.
* @param replica_idx Index for the replica.
*
* @return The partition replica.
*/
Expand Down Expand Up @@ -8214,6 +8215,8 @@ typedef struct rd_kafka_ClusterDescription_s rd_kafka_ClusterDescription_t;
*
* @param rk Client instance.
* @param options Optional admin options, or NULL for defaults.
* Valid options:
* - include_authorized_operations
* @param rkqu Queue to emit result on.
*
* @remark The result event type emitted on the supplied queue is of type
Expand Down Expand Up @@ -8457,6 +8460,8 @@ typedef struct rd_kafka_MemberAssignment_s rd_kafka_MemberAssignment_t;
* @param groups Array of groups to describe.
* @param groups_cnt Number of elements in \p groups array.
* @param options Optional admin options, or NULL for defaults.
* Valid options:
* - include_authorized_operations
* @param rkqu Queue to emit result on.
*
* @remark The result event type emitted on the supplied queue is of type
Expand Down
34 changes: 16 additions & 18 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -7855,14 +7855,14 @@ const rd_kafka_AclOperation_t rd_kafka_TopicDescription_authorized_operation(
const rd_kafka_TopicDescription_t *topicdesc,
size_t idx) {
rd_kafka_AclOperation_t *entry =
rd_list_elem(topicdesc->topic_authorized_operations, idx);
rd_list_elem(topicdesc->authorized_operations, idx);
return *entry;
}

const int rd_kafka_TopicDescription_topic_authorized_operation_count(
const rd_kafka_TopicDescription_t *topicdesc) {
if (topicdesc->topic_authorized_operations)
return rd_list_cnt(topicdesc->topic_authorized_operations);
if (topicdesc->authorized_operations)
return rd_list_cnt(topicdesc->authorized_operations);
return 0;
}

Expand Down Expand Up @@ -7953,8 +7953,8 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) {
}
if (likely(topicdesc->partitions != NULL))
rd_free(topicdesc->partitions);
if (likely(topicdesc->topic_authorized_operations != NULL))
rd_list_destroy(topicdesc->topic_authorized_operations);
if (likely(topicdesc->authorized_operations != NULL))
rd_list_destroy(topicdesc->authorized_operations);
rd_free(topicdesc);
}

Expand All @@ -7970,7 +7970,7 @@ static int rd_kafka_DescribeTopics_cmp(const void *a, const void *b) {
}

/**
* @brief Create a new ConsumerGroupDescription object.
* @brief Create a new TopicDescription object.
*
* @param topic topic name
* @param partitions Array of partition metadata (rd_kafka_metadata_partition).
Expand All @@ -7994,15 +7994,14 @@ rd_kafka_TopicDescription_new(const char *topic,
if (error)
topicdesc->error = rd_kafka_error_copy(error);
if (topic_authorized_operations) {
topicdesc->topic_authorized_operations = rd_list_new(
topicdesc->authorized_operations = rd_list_new(
rd_list_cnt(topic_authorized_operations), rd_free);
for (i = 0; i < rd_list_cnt(topic_authorized_operations); i++) {
int *entry =
rd_list_elem(topic_authorized_operations, i);
int *oper = rd_malloc(sizeof(int));
*oper = *entry;
rd_list_add(topicdesc->topic_authorized_operations,
oper);
rd_list_add(topicdesc->authorized_operations, oper);
}
}

Expand Down Expand Up @@ -8222,7 +8221,7 @@ const rd_kafka_AclOperation_t rd_kafka_ClusterDescription_authorized_operation(
const rd_kafka_ClusterDescription_t *clusterdesc,
size_t idx) {
rd_kafka_AclOperation_t *entry =
rd_list_elem(clusterdesc->cluster_authorized_operations, idx);
rd_list_elem(clusterdesc->authorized_operations, idx);
return *entry;
}

Expand All @@ -8238,8 +8237,8 @@ const int rd_kafka_ClusterDescription_controller_id(

const int rd_kafka_ClusterDescription_cluster_authorized_operation_count(
const rd_kafka_ClusterDescription_t *clusterdesc) {
if (clusterdesc->cluster_authorized_operations)
return rd_list_cnt(clusterdesc->cluster_authorized_operations);
if (clusterdesc->authorized_operations)
return rd_list_cnt(clusterdesc->authorized_operations);
return 0;
}

Expand Down Expand Up @@ -8287,19 +8286,18 @@ rd_kafka_ClusterDescription_new(const char *cluster_id,
clusterdesc->controller_id = controller_id;

if (cluster_authorized_operations) {
clusterdesc->cluster_authorized_operations = rd_list_new(
clusterdesc->authorized_operations = rd_list_new(
rd_list_cnt(cluster_authorized_operations), rd_free);
for (i = 0; i < rd_list_cnt(cluster_authorized_operations);
i++) {
int *entry =
rd_list_elem(cluster_authorized_operations, i);
int *oper = malloc(sizeof(int));
*oper = *entry;
rd_list_add(clusterdesc->cluster_authorized_operations,
oper);
rd_list_add(clusterdesc->authorized_operations, oper);
}
} else
clusterdesc->cluster_authorized_operations = NULL;
clusterdesc->authorized_operations = NULL;

clusterdesc->node_cnt = md->broker_cnt;
clusterdesc->Nodes = rd_calloc(sizeof(rd_kafka_Node_t), md->broker_cnt);
Expand All @@ -8315,8 +8313,8 @@ static void rd_kafka_ClusterDescription_destroy(
rd_kafka_ClusterDescription_t *clusterdesc) {
int i;
RD_IF_FREE(clusterdesc->cluster_id, rd_free);
if (clusterdesc->cluster_authorized_operations)
rd_list_destroy(clusterdesc->cluster_authorized_operations);
if (clusterdesc->authorized_operations)
rd_list_destroy(clusterdesc->authorized_operations);
if (clusterdesc->Nodes) {
for (i = 0; i < clusterdesc->node_cnt; i++) {
rd_kafka_Node_t *node = &(clusterdesc->Nodes[i]);
Expand Down
8 changes: 3 additions & 5 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,8 @@ struct rd_kafka_TopicDescription_s {
char *topic; /**< Topic name */
int partition_cnt; /**< Number of partitions in \p partitions*/
struct rd_kafka_metadata_partition *partitions; /**< Partitions */
rd_kafka_error_t *error; /**< Topic error reported by broker */
rd_list_t *topic_authorized_operations; /**< ACL operations allowed for
topics */
rd_kafka_error_t *error; /**< Topic error reported by broker */
rd_list_t *authorized_operations; /**< Operations allowed for topic */
};

/**@}*/
Expand All @@ -520,8 +519,7 @@ struct rd_kafka_ClusterDescription_s {
int controller_id; /**< current controller id in \p cluster*/
int node_cnt; /**< Number of brokers in \p cluster*/
rd_kafka_Node_t *Nodes; /**< Nodes */
rd_list_t *cluster_authorized_operations; /**< ACL operations allowed
for cluster */
rd_list_t *authorized_operations; /**< Operations allowed for cluster */
};

/**@}*/
Expand Down
12 changes: 7 additions & 5 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,7 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
: rd_false;
rd_bool_t has_reliable_leader_epochs =
rd_kafka_has_reliable_leader_epochs(rkb);
const char *reason = request ? (request->rkbuf_u.Metadata.reason
? request->rkbuf_u.Metadata.reason
: "(no reason)")
: "(admin request)";

const char *reason = "(no reason)";
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
int32_t controller_id = -1;
Expand All @@ -496,6 +492,12 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
request ? request->rkbuf_u.Metadata.force_racks : rd_false;
rd_bool_t compute_racks = has_client_rack || force_rack_computation;

/* If there's no request, we're parsing this for an AdminAPI. */
if (!request)
reason = "(admin request)";
else if (request->rkbuf_u.Metadata.reason)
reason = request->rkbuf_u.Metadata.reason;

/* Ignore metadata updates when terminating */
if (rd_kafka_terminating(rkb->rkb_rk)) {
err = RD_KAFKA_RESP_ERR__DESTROY;
Expand Down

0 comments on commit e01ad41

Please sign in to comment.