Skip to content

Commit

Permalink
KIP-430 initial changes without checking tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jainruchir committed Mar 31, 2023
1 parent d0e2156 commit 3a79ed6
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 37 deletions.
47 changes: 39 additions & 8 deletions examples/describe_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ static void usage(const char *reason, ...) {
fprintf(stderr,
"Describe groups usage examples\n"
"\n"
"Usage: %s <options> <group1> <group2> ...\n"
"Usage: %s <options> <include_authorized_operations> <group1> "
"<group2> ...\n"
"\n"
"Options:\n"
" -b <brokers> Bootstrap server list to connect to.\n"
Expand Down Expand Up @@ -167,7 +168,8 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
}

for (i = 0; i < result_groups_cnt; i++) {
int j, member_cnt;
int j, member_cnt, authorized_operation_count,
acl_operation;
const rd_kafka_error_t *error;
const rd_kafka_ConsumerGroupDescription_t *group =
result_groups[i];
Expand All @@ -179,6 +181,8 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
rd_kafka_ConsumerGroupDescription_partition_assignor(group);
rd_kafka_consumer_group_state_t state =
rd_kafka_ConsumerGroupDescription_state(group);
authorized_operation_count =
rd_kafka_ConsumerGroupDescription_authorized_operations_count(group);
member_cnt =
rd_kafka_ConsumerGroupDescription_member_count(group);
error = rd_kafka_ConsumerGroupDescription_error(group);
Expand All @@ -197,10 +201,19 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
}
printf(
"Group \"%s\", partition assignor \"%s\", "
"state %s%s, with %" PRId32 " member(s)",
" state %s%s, with %" PRId32 " member(s)\n",
group_id, partition_assignor,
rd_kafka_consumer_group_state_name(state), coordinator_desc,
member_cnt);
for(j=0;j<authorized_operation_count;j++){
acl_operation =
rd_kafka_ConsumerGroupDescription_authorized_operation(group,j);
if(acl_operation==-1)
fprintf(stderr, "Out of bounds access to authorized_operations list\n");
else
printf("%s operation is allowed\n",
rd_kafka_AclOperation_name(acl_operation));
}
if (error)
printf(" error[%" PRId32 "]: %s",
rd_kafka_error_code(error),
Expand Down Expand Up @@ -262,12 +275,19 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
char errstr[512];
rd_kafka_AdminOptions_t *options;
rd_kafka_event_t *event = NULL;
int retval = 0;
int groups_cnt = 0;
rd_kafka_error_t *error;
int retval = 0;
int groups_cnt = 0;

int include_authorized_operations =
parse_int("include_authorized_operations", argv[0]);
if (include_authorized_operations < 0 ||
include_authorized_operations > 1)
usage("Require stable not a 0-1 int");

if (argc >= 1) {
groups = (const char **)&argv[0];
groups_cnt = argc;
groups = (const char **)&argv[1];
groups_cnt = argc - 1;
}

/*
Expand Down Expand Up @@ -296,6 +316,14 @@ cmd_describe_consumer_groups(rd_kafka_conf_t *conf, int argc, char **argv) {
fprintf(stderr, "%% Failed to set timeout: %s\n", errstr);
goto exit;
}
if ((error = rd_kafka_AdminOptions_set_include_authorized_operations(
options, include_authorized_operations))) {
fprintf(stderr,
"%% Failed to set require authorized operations: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
exit(1);
}

rd_kafka_DescribeConsumerGroups(rk, groups, groups_cnt, options, queue);

Expand Down Expand Up @@ -348,7 +376,10 @@ int main(int argc, char **argv) {
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();

conf_set(conf, "sasl.username", "broker");
conf_set(conf, "sasl.password", "broker");
conf_set(conf, "sasl.mechanism", "SCRAM-SHA-256");
conf_set(conf, "security.protocol", "SASL_PLAINTEXT");

/*
* Parse common options
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4769,7 +4769,7 @@ static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,

state->wait_cnt++;
error = rd_kafka_DescribeGroupsRequest(
rkb, 0, grps, i, RD_KAFKA_REPLYQ(state->q, 0),
rkb, 0, grps, i, rd_false, RD_KAFKA_REPLYQ(state->q, 0),
rd_kafka_DescribeGroups_resp_cb, state);
if (error) {
rd_kafka_DescribeGroups_resp_cb(
Expand Down
40 changes: 40 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -6935,6 +6935,23 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets(
rd_kafka_AdminOptions_t *options,
int true_or_false);

/**
* @brief Whether broker should return authorized operations
* (DescribeConsumerGroups).
*
* @param options Admin options.
* @param true_or_false Defaults to false.
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*
* @remark This option is valid for DescribeConsumerGroups.
*/
RD_EXPORT
rd_kafka_error_t *rd_kafka_AdminOptions_set_include_authorized_operations(
rd_kafka_AdminOptions_t *options,
int true_or_false);

/**
* @brief Set consumer groups states to query for.
*
Expand Down Expand Up @@ -8383,6 +8400,29 @@ RD_EXPORT
const char *rd_kafka_ConsumerGroupDescription_partition_assignor(
const rd_kafka_ConsumerGroupDescription_t *grpdesc);

/**
* @brief Gets count of authorized operations for the \p grpdesc group.
*
* @param grpdesc The group description.
*
* @return count of Authorized operations allowed, 0 if authorized operations list is NULL or empty.
*/
RD_EXPORT
size_t rd_kafka_ConsumerGroupDescription_authorized_operations_count(
const rd_kafka_ConsumerGroupDescription_t *grpdesc);

/**
* @brief Gets operation at idx index of authorized operations for the \p grpdesc group.
*
* @param grpdesc The group description.
* @param idx The index for which element is needed.
*
* @return Authorized operation at given index.
*/
RD_EXPORT
int rd_kafka_ConsumerGroupDescription_authorized_operation(
const rd_kafka_ConsumerGroupDescription_t *grpdesc,
size_t idx);

/**
* @brief Gets state for the \p grpdesc group.
Expand Down
95 changes: 82 additions & 13 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,12 @@ static const char *rd_kafka_admin_state_desc[] = {
* @enum Admin request target broker. Must be negative values since the field
* used is broker_id.
*/
enum { RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */
RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and
* and has no target broker */
RD_KAFKA_ADMIN_TARGET_ALL = -4, /**< All available brokers */
enum {
RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */
RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and
* and has no target broker */
RD_KAFKA_ADMIN_TARGET_ALL = -4, /**< All available brokers */
};

/**
Expand Down Expand Up @@ -1556,6 +1557,15 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets(
&true_or_false, errstr, sizeof(errstr));
return !err ? NULL : rd_kafka_error_new(err, "%s", errstr);
}
rd_kafka_error_t *rd_kafka_AdminOptions_set_include_authorized_operations(
rd_kafka_AdminOptions_t *options,
int true_or_false) {
char errstr[512];
rd_kafka_resp_err_t err = rd_kafka_confval_set_type(
&options->include_authorized_operations, RD_KAFKA_CONFVAL_INT,
&true_or_false, errstr, sizeof(errstr));
return !err ? NULL : rd_kafka_error_new(err, "%s", errstr);
}

rd_kafka_error_t *rd_kafka_AdminOptions_set_include_topic_authorized_operations(
rd_kafka_AdminOptions_t *options,
Expand Down Expand Up @@ -1692,6 +1702,15 @@ static void rd_kafka_AdminOptions_init(rd_kafka_t *rk,
rd_kafka_confval_disable(&options->include_cluster_authorized_operations,
"include_cluster_authorized_operations");

if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
options->for_api == RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS)
rd_kafka_confval_init_int(
&options->include_authorized_operations,
"include_authorized_operations", 0, 1, 0);
else
rd_kafka_confval_disable(&options->require_stable_offsets,
"include_authorized_operations");

if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
options->for_api == RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS)
rd_kafka_confval_init_ptr(&options->match_consumer_group_states,
Expand Down Expand Up @@ -6160,6 +6179,7 @@ const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions(
* @param members List of members (rd_kafka_MemberDescription_t) of this
* group.
* @param partition_assignor (optional) Chosen assignor.
* @param authorized_operations authorized operations.
* @param state Group state.
* @param coordinator (optional) Group coordinator.
* @param error (optional) Error received for this group.
Expand All @@ -6171,6 +6191,7 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id,
rd_bool_t is_simple_consumer_group,
const rd_list_t *members,
const char *partition_assignor,
rd_list_t* authorized_operations,
rd_kafka_consumer_group_state_t state,
const rd_kafka_Node_t *coordinator,
rd_kafka_error_t *error) {
Expand All @@ -6186,10 +6207,11 @@ rd_kafka_ConsumerGroupDescription_new(const char *group_id,
rd_list_copy_to(&grpdesc->members, members,
rd_kafka_MemberDescription_list_copy, NULL);
}
grpdesc->partition_assignor = !partition_assignor
? (char *)partition_assignor
: rd_strdup(partition_assignor);
grpdesc->state = state;
grpdesc->partition_assignor = !partition_assignor
? (char *)partition_assignor
: rd_strdup(partition_assignor);
grpdesc->authorized_operations = authorized_operations;
grpdesc->state = state;
if (coordinator != NULL)
grpdesc->coordinator = rd_kafka_Node_copy(coordinator);
grpdesc->error =
Expand All @@ -6210,7 +6232,7 @@ static rd_kafka_ConsumerGroupDescription_t *
rd_kafka_ConsumerGroupDescription_new_error(const char *group_id,
rd_kafka_error_t *error) {
return rd_kafka_ConsumerGroupDescription_new(
group_id, rd_false, NULL, NULL,
group_id, rd_false, NULL, NULL, NULL,
RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN, NULL, error);
}

Expand All @@ -6225,7 +6247,8 @@ rd_kafka_ConsumerGroupDescription_copy(
const rd_kafka_ConsumerGroupDescription_t *grpdesc) {
return rd_kafka_ConsumerGroupDescription_new(
grpdesc->group_id, grpdesc->is_simple_consumer_group,
&grpdesc->members, grpdesc->partition_assignor, grpdesc->state,
&grpdesc->members, grpdesc->partition_assignor,
grpdesc->authorized_operations, grpdesc->state,
grpdesc->coordinator, grpdesc->error);
}

Expand Down Expand Up @@ -6278,6 +6301,19 @@ const char *rd_kafka_ConsumerGroupDescription_partition_assignor(
return grpdesc->partition_assignor;
}

size_t rd_kafka_ConsumerGroupDescription_authorized_operations_count(
const rd_kafka_ConsumerGroupDescription_t *grpdesc) {
if(grpdesc->authorized_operations)
return grpdesc->authorized_operations->rl_cnt;
return 0;
}

int rd_kafka_ConsumerGroupDescription_authorized_operation(
const rd_kafka_ConsumerGroupDescription_t *grpdesc,
size_t idx) {
rd_kafka_AclOperation_t* entry = rd_list_elem(grpdesc->authorized_operations, idx);
return *entry;
}

rd_kafka_consumer_group_state_t rd_kafka_ConsumerGroupDescription_state(
const rd_kafka_ConsumerGroupDescription_t *grpdesc) {
Expand Down Expand Up @@ -6375,7 +6411,7 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest(
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
int i;
int i, include_authorized_operations;
char *group;
rd_kafka_resp_err_t err;
int groups_cnt = rd_list_cnt(groups);
Expand All @@ -6385,7 +6421,10 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest(
RD_LIST_FOREACH(group, groups, i) {
groups_arr[i] = rd_list_elem(groups, i);
}
include_authorized_operations =
rd_kafka_confval_get_int(&options->include_authorized_operations);
error = rd_kafka_DescribeGroupsRequest(rkb, -1, groups_arr, groups_cnt,
include_authorized_operations,
replyq, resp_cb, opaque);
rd_free(groups_arr);

Expand All @@ -6400,6 +6439,28 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest(
return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
* @brief Parse authorized_operations returned in DescribeConsumerGroups
* @returns array of rd_bool_t of size RD_KAFKA_ACL_OPERATIONS__CNT
*/
rd_list_t* rd_kafka_AuthorizedOperations_parse(int32_t authorized_operations){
int i, bit;
rd_list_t* authorized_operations_list = NULL;
rd_kafka_AclOperation_t* entry;
/* in case of authorized_operations not requested, return NULL*/
if(authorized_operations<0)
return NULL;
authorized_operations_list = rd_list_new(0, NULL);
for(i=0; i<RD_KAFKA_ACL_OPERATION__CNT; i++){
bit = (authorized_operations >> i) & 1;
if(bit){
entry = malloc(sizeof(rd_kafka_AclOperation_t));
*entry = i;
rd_list_add(authorized_operations_list, entry);
}
}
return authorized_operations_list;
}
/**
* @brief Parse DescribeConsumerGroupsResponse and create ADMIN_RESULT op.
*/
Expand Down Expand Up @@ -6441,6 +6502,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
node = rd_kafka_Node_new(nodeid, host, port, NULL);
while (cnt-- > 0) {
int16_t error_code;
int32_t authorized_operations = 0;
rd_list_t* authorized_operations_list;
rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData;
rd_bool_t is_simple_consumer_group, is_consumer_protocol_type;
int32_t member_cnt;
Expand Down Expand Up @@ -6551,13 +6614,19 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,

if (api_version >= 3) {
/* TODO: implement KIP-430 */
int32_t authorized_operations;
rd_kafka_buf_read_i32(reply, &authorized_operations);
/* assert that the last 3 bits are never set*/
rd_assert(!((authorized_operations >> 0) & 1));
rd_assert(!((authorized_operations >> 1) & 1));
rd_assert(!((authorized_operations >> 2) & 1));
/* authorized_operations is -2147483648 in case of not requested, list has no elements in that case*/
authorized_operations_list = rd_kafka_AuthorizedOperations_parse(authorized_operations);
}

if (error == NULL) {
grpdesc = rd_kafka_ConsumerGroupDescription_new(
group_id, is_simple_consumer_group, &members, proto,
authorized_operations_list,
rd_kafka_consumer_group_state_code(group_state),
node, error);
} else {
Expand Down
8 changes: 8 additions & 0 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ struct rd_kafka_AdminOptions_s {
* Valid for:
* ListConsumerGroupOffsets
*/
rd_kafka_confval_t
include_authorized_operations; /**< BOOL: Whether broker should
* return authorized operations.
* Valid for:
* DescribeConsumerGroups
*/

rd_kafka_confval_t
include_topic_authorized_operations; /**< BOOL: Whether broker should return
Expand Down Expand Up @@ -488,6 +494,8 @@ struct rd_kafka_ConsumerGroupDescription_s {
rd_kafka_consumer_group_state_t state;
/** Consumer group coordinator. */
rd_kafka_Node_t *coordinator;
/** Authorized operations. */
rd_list_t* authorized_operations;
/** Group specific error. */
rd_kafka_error_t *error;
};
Expand Down
Loading

0 comments on commit 3a79ed6

Please sign in to comment.