Skip to content

Commit

Permalink
Fix style and docs issues
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Jun 15, 2023
1 parent 99a368d commit 3fec50a
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 77 deletions.
5 changes: 5 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -8065,12 +8065,17 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_include_topic_authorized_operations(
* @{
*/


/**
* @brief DescribeCluster result type.
*/
typedef struct rd_kafka_ClusterDescription_s rd_kafka_ClusterDescription_t;

/**
* @brief Gets the node for the \p clusterdesc cluster at idx position.
*
* @param clusterdesc The cluster description.
* @param idx the index at which to return the node.
*
* @return The node at idx position.
*
Expand Down
43 changes: 20 additions & 23 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,11 @@ static const char *rd_kafka_admin_state_desc[] = {
* @enum Admin request target broker. Must be negative values since the field
* used is broker_id.
*/
enum {
RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */
RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and
* and has no target broker */
RD_KAFKA_ADMIN_TARGET_ALL = -4, /**< All available brokers */
enum { RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */
RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and
* and has no target broker */
RD_KAFKA_ADMIN_TARGET_ALL = -4, /**< All available brokers */
};

/**
Expand Down Expand Up @@ -7100,10 +7099,10 @@ rd_kafka_admin_DescribeTopicsRequest(rd_kafka_broker_t *rkb,
/* resp_cb = rd_kafka_admin_handle_response; */

// error = Call metadata request
err = rd_kafka_MetadataRequest(rkb, topics, "describe topics", rd_false,
rd_false,
include_topic_authorized_operations,
rd_false, rd_false, NULL, resp_cb, 0, opaque);
err = rd_kafka_MetadataRequest(
rkb, topics, "describe topics", rd_false, rd_false,
include_topic_authorized_operations, rd_false, rd_false, NULL,
resp_cb, 0, opaque);

if (err) {
rd_snprintf(errstr, errstr_size, "%s", rd_kafka_err2str(err));
Expand All @@ -7123,7 +7122,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
rd_kafka_metadata_internal_t *mdi = NULL;
struct rd_kafka_metadata *md = NULL;
struct rd_kafka_metadata *md = NULL;
rd_kafka_resp_err_t err;
rd_list_t topics = rko_req->rko_u.admin_request.args;
rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
Expand All @@ -7137,7 +7136,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
if (err)
goto err;
rko_result = rd_kafka_admin_result_new(rko_req);
md = &mdi->metadata;
md = &mdi->metadata;
rd_list_init(&rko_result->rko_u.admin_result.results, md->topic_cnt,
rd_kafka_TopicDescription_free);
cnt = md->topic_cnt;
Expand All @@ -7147,8 +7146,7 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
/* topics in md should be in the same order as in
* mdi->topics[i]*/
rd_assert(strcmp(md->topics[i].topic,
mdi->topics[i].topic_name) ==
0);
mdi->topics[i].topic_name) == 0);
if (md->topics[i].err == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_list_t *authorized_operations;
authorized_operations =
Expand Down Expand Up @@ -7464,13 +7462,12 @@ rd_kafka_admin_DescribeClusterRequest(rd_kafka_broker_t *rkb,
/* resp_cb = rd_kafka_admin_handle_response; */

// err = Call metadata request with NULL topics
err = rd_kafka_MetadataRequest(rkb, NULL, "describe cluster",
rd_false /*no auto create*/,
include_cluster_authorized_operations,
rd_false /*!include topic authorized operations */,
rd_false /*cgrp update*/,
rd_false /* force_rack */,
NULL, resp_cb, 1, opaque);
err = rd_kafka_MetadataRequest(
rkb, NULL, "describe cluster", rd_false /*no auto create*/,
include_cluster_authorized_operations,
rd_false /*!include topic authorized operations */,
rd_false /*cgrp update*/, rd_false /* force_rack */, NULL, resp_cb,
1, opaque);

if (err) {
rd_snprintf(errstr, errstr_size, "%s", rd_kafka_err2str(err));
Expand Down Expand Up @@ -7501,9 +7498,9 @@ rd_kafka_DescribeClusterResponse_parse(rd_kafka_op_t *rko_req,
// thrd_is_current(rk->rk_thread));
// rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY);

err = rd_kafka_parse_Metadata(rkb, NULL, reply, &mdi, &topics);
err = rd_kafka_parse_Metadata(rkb, NULL, reply, &mdi, &topics);
cluster_id = mdi->cluster_id;
controller_id = mdi->controller_id;
controller_id = mdi->controller_id;
cluster_authorized_operations = mdi->cluster_authorized_operations;
if (err)
goto err;
Expand Down
37 changes: 19 additions & 18 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ rd_kafka_metadata(rd_kafka_t *rk,
* topics in the cluster, since a
* partial request may make it seem
* like some subscribed topics are missing. */
all_topics ? rd_true : rd_false, rd_false /* force_racks */, rko, NULL, 0, NULL);
all_topics ? rd_true : rd_false, rd_false /* force_racks */, rko,
NULL, 0, NULL);

rd_list_destroy(&topics);
rd_kafka_broker_destroy(rkb);
Expand Down Expand Up @@ -450,12 +451,11 @@ rd_kafka_populate_metadata_topic_racks(rd_tmpabuf_t *tbuf,
*
* @locality rdkafka main thread
*/
rd_kafka_resp_err_t
rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
rd_kafka_metadata_internal_t **mdip,
rd_list_t *request_topics) {
rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
rd_kafka_buf_t *rkbuf,
rd_kafka_metadata_internal_t **mdip,
rd_list_t *request_topics) {
rd_kafka_t *rk = rkb->rkb_rk;
int i, j, k;
rd_tmpabuf_t tbuf;
Expand Down Expand Up @@ -595,7 +595,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_tmpabuf_alloc(&tbuf, md->topic_cnt * sizeof(*md->topics))))
rd_kafka_buf_parse_fail(
rkbuf, "%d topics: tmpabuf memory shortage", md->topic_cnt);

if (!(mdi->topics = rd_tmpabuf_alloc(&tbuf, md->topic_cnt *
sizeof(*mdi->topics))))
rd_kafka_buf_parse_fail(
Expand Down Expand Up @@ -735,7 +735,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
&TopicAuthorizedOperations);
mdi->topics[i].topic_name = md->topics[i].topic;
mdi->topics[i].topic_authorized_operations =
TopicAuthorizedOperations;
TopicAuthorizedOperations;
}

rd_kafka_buf_skip_tags(rkbuf);
Expand Down Expand Up @@ -799,7 +799,7 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
/* ClusterAuthorizedOperations */
rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations);
mdi->cluster_authorized_operations =
ClusterAuthorizedOperations;
ClusterAuthorizedOperations;
}

rd_kafka_buf_skip_tags(rkbuf);
Expand Down Expand Up @@ -1263,11 +1263,11 @@ rd_kafka_metadata_refresh_topics(rd_kafka_t *rk,
"Requesting metadata for %d/%d topics: %s",
rd_list_cnt(&q_topics), rd_list_cnt(topics), reason);

rd_kafka_MetadataRequest(rkb, &q_topics, reason, allow_auto_create,
rd_false /*!include cluster authorized operations */,
rd_false /*!include topic authorized operations */,
cgrp_update, rd_false /* force_racks */, NULL,
NULL, 0, NULL);
rd_kafka_MetadataRequest(
rkb, &q_topics, reason, allow_auto_create,
rd_false /*!include cluster authorized operations */,
rd_false /*!include topic authorized operations */, cgrp_update,
rd_false /* force_racks */, NULL, NULL, 0, NULL);

rd_list_destroy(&q_topics);

Expand Down Expand Up @@ -1448,8 +1448,8 @@ rd_kafka_resp_err_t rd_kafka_metadata_refresh_all(rd_kafka_t *rk,
rkb, &topics, reason, rd_false /*no auto create*/,
rd_false /*!include cluster authorized operations */,
rd_false /*!include topic authorized operations */,
rd_true /*cgrp update*/, rd_false /* force_rack */,
NULL, NULL, 0, NULL);
rd_true /*cgrp update*/, rd_false /* force_rack */, NULL, NULL, 0,
NULL);
rd_list_destroy(&topics);

if (destroy_rkb)
Expand Down Expand Up @@ -1491,7 +1491,8 @@ rd_kafka_metadata_request(rd_kafka_t *rk,
rd_kafka_MetadataRequest(rkb, topics, reason, allow_auto_create_topics,
include_cluster_authorized_operations,
include_topic_authorized_operations,
cgrp_update, rd_false /* force racks */, rko, NULL, 0, NULL);
cgrp_update, rd_false /* force racks */, rko,
NULL, 0, NULL);

if (destroy_rkb)
rd_kafka_broker_destroy(rkb);
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
/** Topic Name. */
char* topic_name;
char *topic_name;
int32_t topic_authorized_operations; /**< ACL operations allowed
for topic */
} rd_kafka_metadata_topic_internal_t;
Expand Down Expand Up @@ -84,8 +84,8 @@ typedef struct rd_kafka_metadata_internal_s {
rd_kafka_metadata_broker_internal_t *brokers;
/* Internal metadata topics. Same count as metadata.topic_cnt. */
rd_kafka_metadata_topic_internal_t *topics;
char *cluster_id; /**< current cluster id in \p cluster*/
int controller_id; /**< current controller id in \p cluster*/
char *cluster_id; /**< current cluster id in \p cluster*/
int controller_id; /**< current controller id in \p cluster*/
int32_t cluster_authorized_operations; /**< ACL operations allowed
for cluster */
} rd_kafka_metadata_internal_t;
Expand Down
36 changes: 17 additions & 19 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,14 +447,13 @@ struct rd_kafka_op_s {
struct rd_kafka_admin_worker_cbs *cbs;

/** Worker state */
enum {
RD_KAFKA_ADMIN_STATE_INIT,
RD_KAFKA_ADMIN_STATE_WAIT_BROKER,
RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,
RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS,
RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,
RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,
RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST,
enum { RD_KAFKA_ADMIN_STATE_INIT,
RD_KAFKA_ADMIN_STATE_WAIT_BROKER,
RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,
RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS,
RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,
RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,
RD_KAFKA_ADMIN_STATE_WAIT_BROKER_LIST,
} state;

int32_t broker_id; /**< Requested broker id to
Expand Down Expand Up @@ -544,17 +543,16 @@ struct rd_kafka_op_s {

/**< Mock cluster command */
struct {
enum {
RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,
RD_KAFKA_MOCK_CMD_TOPIC_CREATE,
RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
RD_KAFKA_MOCK_CMD_BROKER_SET_RTT,
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
RD_KAFKA_MOCK_CMD_COORD_SET,
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
enum { RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,
RD_KAFKA_MOCK_CMD_TOPIC_CREATE,
RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
RD_KAFKA_MOCK_CMD_BROKER_SET_RTT,
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
RD_KAFKA_MOCK_CMD_COORD_SET,
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
} cmd;

rd_kafka_resp_err_t err; /**< Error for:
Expand Down
15 changes: 7 additions & 8 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,13 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
int32_t rktp_fetch_version; /* Op version of curr fetch.
(broker thread) */

enum {
RD_KAFKA_TOPPAR_FETCH_NONE = 0,
RD_KAFKA_TOPPAR_FETCH_STOPPING,
RD_KAFKA_TOPPAR_FETCH_STOPPED,
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT,
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
enum { RD_KAFKA_TOPPAR_FETCH_NONE = 0,
RD_KAFKA_TOPPAR_FETCH_STOPPING,
RD_KAFKA_TOPPAR_FETCH_STOPPED,
RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT,
RD_KAFKA_TOPPAR_FETCH_ACTIVE,
} rktp_fetch_state; /* Broker thread's state */

#define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \
Expand Down
11 changes: 5 additions & 6 deletions src/rdkafka_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,11 @@ struct rd_kafka_topic_s {
rd_refcnt_t rkt_app_refcnt; /**< Number of active rkt's new()ed
* by application. */

enum {
RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */
RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */
RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
RD_KAFKA_TOPIC_S_ERROR, /* Topic exists but is in an errored
* state, such as auth failure. */
enum { RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */
RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */
RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */
RD_KAFKA_TOPIC_S_ERROR, /* Topic exists but is in an errored
* state, such as auth failure. */
} rkt_state;

int rkt_flags;
Expand Down

0 comments on commit 3fec50a

Please sign in to comment.