diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 41934e349..e5868eeb8 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -2910,6 +2910,45 @@ const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) { return names[restype]; } +rd_kafka_InternalConfigResourceType_t +map_to_internal_config_resourcetype(rd_kafka_ResourceType_t resourcetype) { + switch (resourcetype) { + case RD_KAFKA_RESOURCE_UNKNOWN: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN; + case RD_KAFKA_RESOURCE_ANY: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY; + case RD_KAFKA_RESOURCE_TOPIC: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC; + case RD_KAFKA_RESOURCE_GROUP: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP; + case RD_KAFKA_RESOURCE_BROKER: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER; + case RD_KAFKA_RESOURCE__CNT: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT; + default: + return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN; + } +} + +rd_kafka_ResourceType_t map_from_internal_config_resourcetype( + rd_kafka_InternalConfigResourceType_t internal_resourcetype) { + switch (internal_resourcetype) { + case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN: + return RD_KAFKA_RESOURCE_UNKNOWN; + case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY: + return RD_KAFKA_RESOURCE_ANY; + case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC: + return RD_KAFKA_RESOURCE_TOPIC; + case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP: + return RD_KAFKA_RESOURCE_GROUP; + case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER: + return RD_KAFKA_RESOURCE_BROKER; + case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT: + return RD_KAFKA_RESOURCE__CNT; + default: + return RD_KAFKA_RESOURCE_UNKNOWN; + } +} rd_kafka_ConfigResource_t * rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype, @@ -3367,6 +3406,7 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req, for (i = 0; i < (int)res_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg; + int8_t internal_res_type; int8_t res_type; rd_kafkap_str_t kres_name; char *res_name; @@ -3377,11 +3417,14 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); - rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_i8(reply, &internal_res_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); rd_kafka_buf_skip_tags(reply); + res_type = + map_from_internal_config_resourcetype(internal_res_type); + if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) @@ -3638,6 +3681,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, for (i = 0; i < (int)res_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg; + int8_t internal_res_type; int8_t res_type; rd_kafkap_str_t kres_name; char *res_name; @@ -3649,10 +3693,13 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); - rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_i8(reply, &internal_res_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + res_type = + map_from_internal_config_resourcetype(internal_res_type); + if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 04c498bf8..b24e51fe1 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -278,6 +278,32 @@ struct rd_kafka_ConfigResource_result_s { * but with response error values. */ }; +typedef enum rd_kafka_InternalConfigResourceType_t { + RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN = 0, + RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY = 1, + RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC = 2, + RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP = + 32, // Changed value for config APIs + RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER = 4, + RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT, +} rd_kafka_InternalConfigResourceType_t; + +/** + * @brief Map from rd_kafka_ResourceType_t to + * rd_kafka_InternalConfigResourceType_t + */ +rd_kafka_InternalConfigResourceType_t +map_to_internal_config_resourcetype(rd_kafka_ResourceType_t resourcetype); + +/** + * @brief Map from rd_kafka_InternalConfigResourceType_t to + * rd_kafka_ResourceType_t + */ + +rd_kafka_ResourceType_t map_from_internal_config_resourcetype( + rd_kafka_InternalConfigResourceType_t internal_resourcetype); + + /**@}*/ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index ac04343fa..31ddc2590 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -5371,7 +5371,9 @@ rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest( int ei; /* ResourceType */ - rd_kafka_buf_write_i8(rkbuf, config->restype); + rd_kafka_buf_write_i8( + rkbuf, + map_to_internal_config_resourcetype(config->restype)); /* ResourceName */ rd_kafka_buf_write_str(rkbuf, config->name, -1); @@ -5465,7 +5467,9 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( int ei; /* resource_type */ - rd_kafka_buf_write_i8(rkbuf, config->restype); + rd_kafka_buf_write_i8( + rkbuf, + map_to_internal_config_resourcetype(config->restype)); /* resource_name */ rd_kafka_buf_write_str(rkbuf, config->name, -1); diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 9144c400c..1bb4c3f37 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -902,7 +902,7 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { */ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { -#define MY_CONFRES_CNT 3 +#define MY_CONFRES_CNT 4 char *topics[MY_CONFRES_CNT]; rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; rd_kafka_AdminOptions_t *options; @@ -935,6 +935,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, /** Test the test helper, for use in other tests. */ do { const char *broker_id = tsprintf("%d", avail_brokers[0]); + const char *group_id = "my-group"; const char *confs_set_append[] = { "compression.type", "SET", "lz4", "cleanup.policy", "APPEND", "compact"}; @@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, const char *confs_delete_subtract_broker[] = { "background.threads", "DELETE", "", "log.cleanup.policy", "SUBTRACT", "compact"}; + const char *confs_set_append_group[] = { + "consumer.session.timeout.ms", "SET", "50000"}; + const char *confs_delete_group[] = { + "consumer.session.timeout.ms", "DELETE", ""}; TEST_SAY("Testing test helper with SET and APPEND\n"); test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, @@ -969,6 +974,17 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, test_IncrementalAlterConfigs_simple( rk, RD_KAFKA_RESOURCE_BROKER, broker_id, confs_delete_subtract_broker, 2); + TEST_SAY( + "Testing test helper with SET with GROUP resource type\n"); + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP, + group_id, + confs_set_append_group, 1); + TEST_SAY( + "Testing test helper with DELETE with GROUP resource " + "type\n"); + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP, + group_id, + confs_delete_group, 1); TEST_SAY("End testing test helper\n"); } while (0); @@ -1035,6 +1051,23 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN; ci++; + /** + * ConfigResource #3: valid group config + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, "my-group"); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "consumer.session.timeout.ms", + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "50000"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + if (!test_consumer_group_protocol_classic()) { + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + } else { + exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + } + ci++; + /* * Timeout options */ @@ -1149,7 +1182,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, * @brief Test DescribeConfigs */ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { -#define MY_CONFRES_CNT 3 +#define MY_CONFRES_CNT 4 char *topics[MY_CONFRES_CNT]; rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; rd_kafka_AdminOptions_t *options; @@ -1159,6 +1192,7 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { const rd_kafka_DescribeConfigs_result_t *res; const rd_kafka_ConfigResource_t **rconfigs; size_t rconfig_cnt; + char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); char errstr[128]; const char *errstr2; int ci = 0; @@ -1210,6 +1244,18 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; ci++; + /* + * ConfigResource #3: group config, for a non-existent group. + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, group); + if (!test_consumer_group_protocol_classic()) { + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + } else { + exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + } + ci++; + retry_describe: /*