Skip to content

Commit

Permalink
Address review comments: Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Aug 9, 2023
1 parent 0bc97b3 commit 45780f8
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 129 deletions.
15 changes: 11 additions & 4 deletions examples/describe_topics.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) {
fatal("Failed to set %s=%s: %s", name, val, errstr);
}


/**
* @brief Parse an integer or fail.
*/
Expand All @@ -133,6 +134,8 @@ int64_t parse_int(const char *what, const char *str) {

return (int64_t)n;
}


/**
* @brief Print topics information.
*/
Expand Down Expand Up @@ -242,6 +245,8 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc,
}
return 0;
}


/**
* @brief Call rd_kafka_DescribeTopics() with a list of
* topics.
Expand All @@ -267,14 +272,14 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
topics_cnt = argc - 1;
}
/*
* Create consumer instance
* Create producer instance
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
fatal("Failed to create new consumer: %s", errstr);
fatal("Failed to create new producer: %s", errstr);

/*
* Describe topics
Expand All @@ -299,7 +304,8 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
"%s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
exit(1);
retval = 1;
goto exit;
}

rd_kafka_DescribeTopics(rk, topics, topics_cnt, options, queue);
Expand Down Expand Up @@ -343,6 +349,7 @@ static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
exit(retval);
}


int main(int argc, char **argv) {
rd_kafka_conf_t *conf; /**< Client configuration object */
int opt;
Expand Down
220 changes: 95 additions & 125 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,15 @@ rd_kafka_admin_fanout_worker(rd_kafka_t *rk,

/* Common copy function for use in rd_list_t * for (rd_kafka_AclOperation_t *)
*/
static void *copy_AclOperation(const void *_acl_operation, void *opaque) {
if (!_acl_operation)
static void *rd_kafka_AclOperation_copy(const void *acl_operation,
void *opaque) {
if (!acl_operation)
return NULL;
const rd_kafka_AclOperation_t *acl_operation =
(rd_kafka_AclOperation_t *)_acl_operation;

rd_kafka_AclOperation_t *acl_operation_copy =
rd_malloc(sizeof(rd_kafka_AclOperation_t));
*acl_operation_copy = *acl_operation;
*acl_operation_copy = *((rd_kafka_AclOperation_t *)acl_operation);

return acl_operation_copy;
}

Expand Down Expand Up @@ -7105,9 +7106,8 @@ rd_list_t *rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations) {
int bit = (authorized_operations >> i) & 1;
if (bit) {
rd_kafka_AclOperation_t *entry =
(rd_kafka_AclOperation_t *)rd_malloc(
sizeof(rd_kafka_AclOperation_t));
*entry = i;
rd_malloc(sizeof(rd_kafka_AclOperation_t));
*entry = (rd_kafka_AclOperation_t)i;
rd_list_add(authorized_operations_list, entry);
}
}
Expand Down Expand Up @@ -7270,7 +7270,8 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id,
rd_list_init_copy(&grpdesc->authorized_operations,
authorized_operations);
rd_list_copy_to(&grpdesc->authorized_operations,
authorized_operations, copy_AclOperation, NULL);
authorized_operations,
rd_kafka_AclOperation_copy, NULL);
}

grpdesc->state = state;
Expand All @@ -7287,8 +7288,9 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id,
* @brief New instance of ConsumerGroupDescription from an error.
*
* @param group_id The group id.
* @param error The error.
* @param error Error received for this group.
* @return A new allocated ConsumerGroupDescription with the passed error.
* Use rd_kafka_ConsumerGroupDescription_destroy() to free when done.
*/
static rd_kafka_ConsumerGroupDescription_t *
rd_kafka_ConsumerGroupDescription_new_error(const char *group_id,
Expand Down Expand Up @@ -7858,6 +7860,85 @@ rd_kafka_DescribeConsumerGroups_result_groups(
*
*/


/**
* @brief Create a new TopicDescription object.
*
* @param topic topic name
* @param partitions Array of partition metadata (rd_kafka_metadata_partition).
* @param partition_cnt Number of partitions in partition metadata.
* @param topic_authorized_operations acl operations allowed for topic.
* @param error Topic error reported by the broker.
* @return A newly allocated TopicDescription object.
* Use rd_kafka_TopicDescription_destroy() to free when done.
*/
static rd_kafka_TopicDescription_t *
rd_kafka_TopicDescription_new(const char *topic,
struct rd_kafka_metadata_partition *partitions,
int partition_cnt,
const rd_list_t *topic_authorized_operations,
rd_kafka_error_t *error) {
rd_kafka_TopicDescription_t *topicdesc;
int i;
topicdesc = rd_calloc(1, sizeof(*topicdesc));
topicdesc->topic = rd_strdup(topic);
topicdesc->partition_cnt = partition_cnt;
if (error)
topicdesc->error = rd_kafka_error_copy(error);

if (topic_authorized_operations == NULL)
rd_list_init(&topicdesc->authorized_operations, 0, rd_free);
else {
rd_list_init_copy(&topicdesc->authorized_operations,
topic_authorized_operations);
rd_list_copy_to(&topicdesc->authorized_operations,
topic_authorized_operations,
rd_kafka_AclOperation_copy, NULL);
}

if (partitions) {
topicdesc->partitions =
rd_calloc(sizeof(*partitions), partition_cnt);
for (i = 0; i < partition_cnt; i++)
rd_kafka_copy_metadata_partition(
&partitions[i], &topicdesc->partitions[i]);
}
return topicdesc;
}

/**
* @brief Create a new TopicDescription object from an error.
*
* @param topic topic name
* @param error Topic error reported by the broker.
* @return A newly allocated TopicDescription with the passed error.
* Use rd_kafka_TopicDescription_destroy() to free when done.
*/
static rd_kafka_TopicDescription_t *
rd_kafka_TopicDescription_new_error(const char *topic,
rd_kafka_error_t *error) {
return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, error);
}

static void
rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) {
int i;

RD_IF_FREE(topicdesc->topic, rd_free);
RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy);

for (i = 0; i < topicdesc->partition_cnt; i++)
rd_kafka_metadata_partition_clear(&topicdesc->partitions[i]);

RD_IF_FREE(topicdesc->partitions, rd_free);
rd_list_destroy(&topicdesc->authorized_operations);
rd_free(topicdesc);
}

static void rd_kafka_TopicDescription_free(void *ptr) {
rd_kafka_TopicDescription_destroy(ptr);
}

const rd_kafka_AclOperation_t rd_kafka_TopicDescription_authorized_operation(
const rd_kafka_TopicDescription_t *topicdesc,
size_t idx) {
Expand Down Expand Up @@ -7943,126 +8024,13 @@ const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics(
rko->rko_u.admin_result.results.rl_elems;
}

/**
* @brief Helper function to clear a rd_kafka_metadata_partition.
*
* @note Does not deallocate the rd_kafka_metadata_partition itself.
*/
static void
rd_kafka_metadata_partition_clear(struct rd_kafka_metadata_partition *rkmp) {
RD_IF_FREE(rkmp->isrs, rd_free);
RD_IF_FREE(rkmp->replicas, rd_free);
}

static void
rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) {
int i;

RD_IF_FREE(topicdesc->topic, rd_free);
RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy);

for (i = 0; i < topicdesc->partition_cnt; i++)
rd_kafka_metadata_partition_clear(&topicdesc->partitions[i]);

RD_IF_FREE(topicdesc->partitions, rd_free);
rd_list_destroy(&topicdesc->authorized_operations);
rd_free(topicdesc);
}

static void rd_kafka_TopicDescription_free(void *ptr) {
rd_kafka_TopicDescription_destroy(ptr);
}

/**
* @brief Topics arguments comparator for DescribeTopics args
*/
static int rd_kafka_DescribeTopics_cmp(const void *a, const void *b) {
return strcmp(a, b);
}

/**
* @brief Helper function to copy from one rd_kafka_metadata_partition to
* another.
*
* @note Both are assumed to be allocated.
*/
static void
rd_kafka_copy_metadata_partition(struct rd_kafka_metadata_partition *src,
struct rd_kafka_metadata_partition *dst) {
int i;
dst->err = src->err;
dst->id = src->id;
dst->isr_cnt = src->isr_cnt;
dst->isrs = rd_calloc(sizeof(int32_t), dst->isr_cnt);
for (i = 0; i < dst->isr_cnt; i++) {
dst->isrs[i] = src->isrs[i];
}
dst->leader = src->leader;
dst->replica_cnt = src->replica_cnt;
dst->replicas = rd_calloc(sizeof(int32_t), dst->replica_cnt);
for (i = 0; i < dst->replica_cnt; i++) {
dst->replicas[i] = src->replicas[i];
}
}

/**
* @brief Create a new TopicDescription object.
*
* @param topic topic name
* @param partitions Array of partition metadata (rd_kafka_metadata_partition).
* @param partition_cnt Number of partitions in partition metadata.
* @param topic_authorized_operations acl operations allowed for topic.
* @param error Topic error reported by the broker.
* @return A newly allocated TopicDescription object.
* Use rd_kafka_TopicDescription_destroy() to free when done.
*/
static rd_kafka_TopicDescription_t *
rd_kafka_TopicDescription_new(const char *topic,
struct rd_kafka_metadata_partition *partitions,
int partition_cnt,
const rd_list_t *topic_authorized_operations,
rd_kafka_error_t *error) {
rd_kafka_TopicDescription_t *topicdesc;
int i;
topicdesc = rd_calloc(1, sizeof(*topicdesc));
topicdesc->topic = rd_strdup(topic);
topicdesc->partition_cnt = partition_cnt;
if (error)
topicdesc->error = rd_kafka_error_copy(error);

if (topic_authorized_operations == NULL)
rd_list_init(&topicdesc->authorized_operations, 0, rd_free);
else {
rd_list_init_copy(&topicdesc->authorized_operations,
topic_authorized_operations);
rd_list_copy_to(&topicdesc->authorized_operations,
topic_authorized_operations, copy_AclOperation,
NULL);
}

if (partitions) {
topicdesc->partitions =
rd_calloc(sizeof(*partitions), partition_cnt);
for (i = 0; i < partition_cnt; i++)
rd_kafka_copy_metadata_partition(
&partitions[i], &topicdesc->partitions[i]);
}
return topicdesc;
}

/**
* @brief New instance of TopicDescription from an error.
*
* @param group_id The topic.
* @param error The error.
* @return A new allocated TopicDescription with the passed error.
*/
static rd_kafka_TopicDescription_t *
rd_kafka_TopicDescription_new_error(const char *topic,
rd_kafka_error_t *error) {
return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, error);
}

/**
* @brief Construct and send DescribeTopicsRequest to \p rkb
* with the topics (char *) in \p topics, using
Expand Down Expand Up @@ -8122,6 +8090,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
err = rd_kafka_parse_Metadata(rkb, NULL, reply, &mdi, &topics);
if (err)
goto err;

rko_result = rd_kafka_admin_result_new(rko_req);
md = &mdi->metadata;
rd_list_init(&rko_result->rko_u.admin_result.results, md->topic_cnt,
Expand Down Expand Up @@ -8303,7 +8272,8 @@ rd_kafka_ClusterDescription_new(const rd_kafka_metadata_internal_t *mdi) {
rd_list_init_copy(&clusterdesc->authorized_operations,
authorized_operations);
rd_list_copy_to(&clusterdesc->authorized_operations,
authorized_operations, copy_AclOperation, NULL);
authorized_operations,
rd_kafka_AclOperation_copy, NULL);
}

rd_list_init(&clusterdesc->nodes, 0, rd_kafka_Node_free);
Expand Down Expand Up @@ -8376,9 +8346,9 @@ rd_kafka_DescribeClusterResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_op_t *rko_result = NULL;

err = rd_kafka_parse_Metadata(rkb, NULL, reply, &mdi, &topics);

if (err)
goto err;

rko_result = rd_kafka_admin_result_new(rko_req);
rd_list_init(&rko_result->rko_u.admin_result.results, 1,
rd_kafka_ClusterDescription_free);
Expand Down
Loading

0 comments on commit 45780f8

Please sign in to comment.