Skip to content

Commit

Permalink
fix: acl binding enum checks (@emasab, confluentinc#3741)
Browse files Browse the repository at this point in the history
* checking enums values when creating or reading AclBinding and AclBindingFilter

* AclBinding destroy array function

* acl binding unit tests

* warnings and fix for unknown enums, test fixes

* int sizes matching the read size

* pointer to the correct broker
  • Loading branch information
emasab authored and garrett528 committed Apr 8, 2022
1 parent 731bc4f commit bfb7cd4
Show file tree
Hide file tree
Showing 4 changed files with 815 additions and 23 deletions.
10 changes: 10 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -7656,6 +7656,16 @@ rd_kafka_AclBinding_error(const rd_kafka_AclBinding_t *acl);
*/
RD_EXPORT void rd_kafka_AclBinding_destroy(rd_kafka_AclBinding_t *acl_binding);


/**
* @brief Helper function to destroy all AclBinding objects in
* the \p acl_bindings array (of \p acl_bindings_cnt elements).
* The array itself is not freed.
*/
RD_EXPORT void
rd_kafka_AclBinding_destroy_array(rd_kafka_AclBinding_t **acl_bindings,
size_t acl_bindings_cnt);

/**
* @brief Get an array of acl results from a CreateAcls result.
*
Expand Down
178 changes: 164 additions & 14 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4071,6 +4071,36 @@ rd_kafka_AclBinding_new(rd_kafka_ResourceType_t restype,
return NULL;
}

if (restype == RD_KAFKA_RESOURCE_ANY ||
restype <= RD_KAFKA_RESOURCE_UNKNOWN ||
restype >= RD_KAFKA_RESOURCE__CNT) {
rd_snprintf(errstr, errstr_size, "Invalid resource type");
return NULL;
}

if (resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_ANY ||
resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_MATCH ||
resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN ||
resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) {
rd_snprintf(errstr, errstr_size,
"Invalid resource pattern type");
return NULL;
}

if (operation == RD_KAFKA_ACL_OPERATION_ANY ||
operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN ||
operation >= RD_KAFKA_ACL_OPERATION__CNT) {
rd_snprintf(errstr, errstr_size, "Invalid operation");
return NULL;
}

if (permission_type == RD_KAFKA_ACL_PERMISSION_TYPE_ANY ||
permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN ||
permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) {
rd_snprintf(errstr, errstr_size, "Invalid permission type");
return NULL;
}

return rd_kafka_AclBinding_new0(
restype, name, resource_pattern_type, principal, host, operation,
permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL);
Expand All @@ -4086,6 +4116,33 @@ rd_kafka_AclBindingFilter_t *rd_kafka_AclBindingFilter_new(
rd_kafka_AclPermissionType_t permission_type,
char *errstr,
size_t errstr_size) {


if (restype <= RD_KAFKA_RESOURCE_UNKNOWN ||
restype >= RD_KAFKA_RESOURCE__CNT) {
rd_snprintf(errstr, errstr_size, "Invalid resource type");
return NULL;
}

if (resource_pattern_type <= RD_KAFKA_RESOURCE_PATTERN_UNKNOWN ||
resource_pattern_type >= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) {
rd_snprintf(errstr, errstr_size,
"Invalid resource pattern type");
return NULL;
}

if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN ||
operation >= RD_KAFKA_ACL_OPERATION__CNT) {
rd_snprintf(errstr, errstr_size, "Invalid operation");
return NULL;
}

if (permission_type <= RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN ||
permission_type >= RD_KAFKA_ACL_PERMISSION_TYPE__CNT) {
rd_snprintf(errstr, errstr_size, "Invalid permission type");
return NULL;
}

return rd_kafka_AclBinding_new0(
restype, name, resource_pattern_type, principal, host, operation,
permission_type, RD_KAFKA_RESP_ERR_NO_ERROR, NULL);
Expand Down Expand Up @@ -4172,6 +4229,14 @@ static void rd_kafka_AclBinding_free(void *ptr) {
rd_kafka_AclBinding_destroy(ptr);
}


void rd_kafka_AclBinding_destroy_array(rd_kafka_AclBinding_t **acl_bindings,
size_t acl_bindings_cnt) {
size_t i;
for (i = 0; i < acl_bindings_cnt; i++)
rd_kafka_AclBinding_destroy(acl_bindings[i]);
}

/**
* @brief Parse CreateAclsResponse and create ADMIN_RESULT op.
*/
Expand Down Expand Up @@ -4301,6 +4366,7 @@ rd_kafka_DescribeAclsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
const int log_decode_errors = LOG_ERR;
rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
rd_kafka_op_t *rko_result = NULL;
int32_t res_cnt;
Expand Down Expand Up @@ -4331,10 +4397,10 @@ rd_kafka_DescribeAclsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_AclBinding_free);

for (i = 0; i < (int)res_cnt; i++) {
int8_t res_type;
int8_t res_type = RD_KAFKA_RESOURCE_UNKNOWN;
rd_kafkap_str_t kres_name;
char *res_name;
rd_kafka_ResourcePatternType_t resource_pattern_type =
int8_t resource_pattern_type =
RD_KAFKA_RESOURCE_PATTERN_LITERAL;
int32_t acl_cnt;

Expand All @@ -4346,15 +4412,34 @@ rd_kafka_DescribeAclsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_buf_read_i8(reply, &resource_pattern_type);
}

if (res_type <= RD_KAFKA_RESOURCE_UNKNOWN ||
res_type >= RD_KAFKA_RESOURCE__CNT) {
rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE",
"DescribeAclsResponse returned unknown "
"resource type %d",
res_type);
res_type = RD_KAFKA_RESOURCE_UNKNOWN;
}
if (resource_pattern_type <=
RD_KAFKA_RESOURCE_PATTERN_UNKNOWN ||
resource_pattern_type >=
RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) {
rd_rkb_log(rkb, LOG_WARNING, "DESCRIBEACLSRESPONSE",
"DescribeAclsResponse returned unknown "
"resource pattern type %d",
resource_pattern_type);
resource_pattern_type =
RD_KAFKA_RESOURCE_PATTERN_UNKNOWN;
}

/* #resources */
rd_kafka_buf_read_arraycnt(reply, &acl_cnt, 100000);

for (j = 0; j < (int)acl_cnt; j++) {
rd_kafkap_str_t kprincipal;
rd_kafkap_str_t khost;
rd_kafka_AclOperation_t operation =
RD_KAFKA_ACL_OPERATION_UNKNOWN;
rd_kafka_AclPermissionType_t permission_type =
int8_t operation = RD_KAFKA_ACL_OPERATION_UNKNOWN;
int8_t permission_type =
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN;
char *principal;
char *host;
Expand All @@ -4366,10 +4451,32 @@ rd_kafka_DescribeAclsResponse_parse(rd_kafka_op_t *rko_req,
RD_KAFKAP_STR_DUPA(&principal, &kprincipal);
RD_KAFKAP_STR_DUPA(&host, &khost);

acl = rd_kafka_AclBinding_new(
if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN ||
operation >= RD_KAFKA_ACL_OPERATION__CNT) {
rd_rkb_log(rkb, LOG_WARNING,
"DESCRIBEACLSRESPONSE",
"DescribeAclsResponse returned "
"unknown acl operation %d",
operation);
operation = RD_KAFKA_ACL_OPERATION_UNKNOWN;
}
if (permission_type <=
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN ||
permission_type >=
RD_KAFKA_ACL_PERMISSION_TYPE__CNT) {
rd_rkb_log(rkb, LOG_WARNING,
"DESCRIBEACLSRESPONSE",
"DescribeAclsResponse returned "
"unknown acl permission type %d",
permission_type);
permission_type =
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN;
}

acl = rd_kafka_AclBinding_new0(
res_type, res_name, resource_pattern_type,
principal, host, operation, permission_type, NULL,
0);
principal, host, operation, permission_type,
RD_KAFKA_RESP_ERR_NO_ERROR, NULL);

rd_list_add(&rko_result->rko_u.admin_result.results,
acl);
Expand Down Expand Up @@ -4509,6 +4616,7 @@ rd_kafka_DeleteAclsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
const int log_decode_errors = LOG_ERR;
rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
rd_kafka_op_t *rko_result = NULL;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int32_t res_cnt;
Expand Down Expand Up @@ -4550,18 +4658,17 @@ rd_kafka_DeleteAclsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_buf_read_arraycnt(reply, &matching_acls_cnt, 100000);
for (j = 0; j < (int)matching_acls_cnt; j++) {
int16_t acl_error_code;
int8_t res_type;
int8_t res_type = RD_KAFKA_RESOURCE_UNKNOWN;
rd_kafkap_str_t acl_error_msg =
RD_KAFKAP_STR_INITIALIZER;
rd_kafkap_str_t kres_name;
rd_kafkap_str_t khost;
rd_kafkap_str_t kprincipal;
rd_kafka_AclOperation_t operation =
RD_KAFKA_ACL_OPERATION_UNKNOWN;
rd_kafka_AclPermissionType_t permission_type =
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN;
rd_kafka_ResourcePatternType_t resource_pattern_type =
int8_t resource_pattern_type =
RD_KAFKA_RESOURCE_PATTERN_LITERAL;
int8_t operation = RD_KAFKA_ACL_OPERATION_UNKNOWN;
int8_t permission_type =
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN;
rd_kafka_AclBinding_t *matching_acl;
char *acl_errstr = NULL;
char *res_name;
Expand Down Expand Up @@ -4596,6 +4703,49 @@ rd_kafka_DeleteAclsResponse_parse(rd_kafka_op_t *rko_req,
RD_KAFKAP_STR_DUPA(&principal, &kprincipal);
RD_KAFKAP_STR_DUPA(&host, &khost);

if (res_type <= RD_KAFKA_RESOURCE_UNKNOWN ||
res_type >= RD_KAFKA_RESOURCE__CNT) {
rd_rkb_log(rkb, LOG_WARNING,
"DELETEACLSRESPONSE",
"DeleteAclsResponse returned "
"unknown resource type %d",
res_type);
res_type = RD_KAFKA_RESOURCE_UNKNOWN;
}
if (resource_pattern_type <=
RD_KAFKA_RESOURCE_PATTERN_UNKNOWN ||
resource_pattern_type >=
RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT) {
rd_rkb_log(rkb, LOG_WARNING,
"DELETEACLSRESPONSE",
"DeleteAclsResponse returned "
"unknown resource pattern type %d",
resource_pattern_type);
resource_pattern_type =
RD_KAFKA_RESOURCE_PATTERN_UNKNOWN;
}
if (operation <= RD_KAFKA_ACL_OPERATION_UNKNOWN ||
operation >= RD_KAFKA_ACL_OPERATION__CNT) {
rd_rkb_log(rkb, LOG_WARNING,
"DELETEACLSRESPONSE",
"DeleteAclsResponse returned "
"unknown acl operation %d",
operation);
operation = RD_KAFKA_ACL_OPERATION_UNKNOWN;
}
if (permission_type <=
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN ||
permission_type >=
RD_KAFKA_ACL_PERMISSION_TYPE__CNT) {
rd_rkb_log(rkb, LOG_WARNING,
"DELETEACLSRESPONSE",
"DeleteAclsResponse returned "
"unknown acl permission type %d",
permission_type);
permission_type =
RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN;
}

matching_acl = rd_kafka_AclBinding_new0(
res_type, res_name, resource_pattern_type,
principal, host, operation, permission_type,
Expand Down
Loading

0 comments on commit bfb7cd4

Please sign in to comment.