Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Group Support for Config APIs #4873

Open
wants to merge 6 commits into
base: dev_kip848_mock_handler_and_integration_tests
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2910,6 +2910,45 @@ const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) {
return names[restype];
}

rd_kafka_InternalConfigResourceType_t
map_to_internal_config_resourcetype(rd_kafka_ResourceType_t resourcetype) {
switch (resourcetype) {
case RD_KAFKA_RESOURCE_UNKNOWN:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN;
case RD_KAFKA_RESOURCE_ANY:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY;
case RD_KAFKA_RESOURCE_TOPIC:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC;
case RD_KAFKA_RESOURCE_GROUP:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP;
case RD_KAFKA_RESOURCE_BROKER:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER;
case RD_KAFKA_RESOURCE__CNT:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT;
default:
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN;
}
}

rd_kafka_ResourceType_t map_from_internal_config_resourcetype(
rd_kafka_InternalConfigResourceType_t internal_resourcetype) {
switch (internal_resourcetype) {
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN:
return RD_KAFKA_RESOURCE_UNKNOWN;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY:
return RD_KAFKA_RESOURCE_ANY;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC:
return RD_KAFKA_RESOURCE_TOPIC;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP:
return RD_KAFKA_RESOURCE_GROUP;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER:
return RD_KAFKA_RESOURCE_BROKER;
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT:
return RD_KAFKA_RESOURCE__CNT;
default:
return RD_KAFKA_RESOURCE_UNKNOWN;
}
}

rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype,
Expand Down Expand Up @@ -3367,6 +3406,7 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,
for (i = 0; i < (int)res_cnt; i++) {
int16_t error_code;
rd_kafkap_str_t error_msg;
int8_t internal_res_type;
int8_t res_type;
rd_kafkap_str_t kres_name;
char *res_name;
Expand All @@ -3377,11 +3417,14 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &error_msg);
rd_kafka_buf_read_i8(reply, &res_type);
rd_kafka_buf_read_i8(reply, &internal_res_type);
rd_kafka_buf_read_str(reply, &kres_name);
RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
rd_kafka_buf_skip_tags(reply);

res_type =
map_from_internal_config_resourcetype(internal_res_type);

if (error_code) {
if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
RD_KAFKAP_STR_LEN(&error_msg) == 0)
Expand Down Expand Up @@ -3638,6 +3681,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
for (i = 0; i < (int)res_cnt; i++) {
int16_t error_code;
rd_kafkap_str_t error_msg;
int8_t internal_res_type;
int8_t res_type;
rd_kafkap_str_t kres_name;
char *res_name;
Expand All @@ -3649,10 +3693,13 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &error_msg);
rd_kafka_buf_read_i8(reply, &res_type);
rd_kafka_buf_read_i8(reply, &internal_res_type);
rd_kafka_buf_read_str(reply, &kres_name);
RD_KAFKAP_STR_DUPA(&res_name, &kres_name);

res_type =
map_from_internal_config_resourcetype(internal_res_type);

if (error_code) {
if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
RD_KAFKAP_STR_LEN(&error_msg) == 0)
Expand Down
26 changes: 26 additions & 0 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,32 @@ struct rd_kafka_ConfigResource_result_s {
* but with response error values. */
};

typedef enum rd_kafka_InternalConfigResourceType_t {
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN = 0,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY = 1,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC = 2,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP =
32, // Changed value for config APIs
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER = 4,
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT,
} rd_kafka_InternalConfigResourceType_t;

/**
* @brief Map from rd_kafka_ResourceType_t to
* rd_kafka_InternalConfigResourceType_t
*/
rd_kafka_InternalConfigResourceType_t
map_to_internal_config_resourcetype(rd_kafka_ResourceType_t resourcetype);

/**
* @brief Map from rd_kafka_InternalConfigResourceType_t to
* rd_kafka_ResourceType_t
*/

rd_kafka_ResourceType_t map_from_internal_config_resourcetype(
rd_kafka_InternalConfigResourceType_t internal_resourcetype);


/**@}*/


Expand Down
8 changes: 6 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -5371,7 +5371,9 @@ rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest(
int ei;

/* ResourceType */
rd_kafka_buf_write_i8(rkbuf, config->restype);
rd_kafka_buf_write_i8(
rkbuf,
map_to_internal_config_resourcetype(config->restype));

/* ResourceName */
rd_kafka_buf_write_str(rkbuf, config->name, -1);
Expand Down Expand Up @@ -5465,7 +5467,9 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
int ei;

/* resource_type */
rd_kafka_buf_write_i8(rkbuf, config->restype);
rd_kafka_buf_write_i8(
rkbuf,
map_to_internal_config_resourcetype(config->restype));

/* resource_name */
rd_kafka_buf_write_str(rkbuf, config->name, -1);
Expand Down
50 changes: 48 additions & 2 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
*/
static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu) {
#define MY_CONFRES_CNT 3
#define MY_CONFRES_CNT 4
char *topics[MY_CONFRES_CNT];
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
rd_kafka_AdminOptions_t *options;
Expand Down Expand Up @@ -935,6 +935,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
/** Test the test helper, for use in other tests. */
do {
const char *broker_id = tsprintf("%d", avail_brokers[0]);
const char *group_id = "my-group";
const char *confs_set_append[] = {
"compression.type", "SET", "lz4",
"cleanup.policy", "APPEND", "compact"};
Expand All @@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
const char *confs_delete_subtract_broker[] = {
"background.threads", "DELETE", "",
"log.cleanup.policy", "SUBTRACT", "compact"};
const char *confs_set_append_group[] = {
"consumer.session.timeout.ms", "SET", "50000"};
const char *confs_delete_group[] = {
"consumer.session.timeout.ms", "DELETE", ""};

TEST_SAY("Testing test helper with SET and APPEND\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC,
Expand All @@ -969,6 +974,17 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
test_IncrementalAlterConfigs_simple(
rk, RD_KAFKA_RESOURCE_BROKER, broker_id,
confs_delete_subtract_broker, 2);
TEST_SAY(
"Testing test helper with SET with GROUP resource type\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP,
group_id,
confs_set_append_group, 1);
TEST_SAY(
"Testing test helper with DELETE with GROUP resource "
"type\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP,
group_id,
confs_delete_group, 1);
TEST_SAY("End testing test helper\n");
} while (0);

Expand Down Expand Up @@ -1035,6 +1051,23 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN;
ci++;

/**
* ConfigResource #3: valid group config
*/
configs[ci] =
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, "my-group");

error = rd_kafka_ConfigResource_add_incremental_config(
configs[ci], "consumer.session.timeout.ms",
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "50000");
TEST_ASSERT(!error, "%s", rd_kafka_error_string(error));
if (!test_consumer_group_protocol_classic()) {
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
} else {
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}
ci++;

/*
* Timeout options
*/
Expand Down Expand Up @@ -1149,7 +1182,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
* @brief Test DescribeConfigs
*/
static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
#define MY_CONFRES_CNT 3
#define MY_CONFRES_CNT 4
char *topics[MY_CONFRES_CNT];
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
rd_kafka_AdminOptions_t *options;
Expand All @@ -1159,6 +1192,7 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
const rd_kafka_DescribeConfigs_result_t *res;
const rd_kafka_ConfigResource_t **rconfigs;
size_t rconfig_cnt;
char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
char errstr[128];
const char *errstr2;
int ci = 0;
Expand Down Expand Up @@ -1210,6 +1244,18 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
ci++;

/*
* ConfigResource #3: group config, for a non-existent group.
*/
configs[ci] =
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, group);
if (!test_consumer_group_protocol_classic()) {
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
} else {
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}
ci++;


retry_describe:
/*
Expand Down