From fa95fa53892ae9884fda7a8f7a24ee17ab1ecfeb Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 20 Sep 2023 14:49:14 +0530 Subject: [PATCH 1/4] Add telemetry encode and decode unit tests --- src/rdkafka_telemetry_decode.c | 79 +++++++++++++++++++++++++++++++++- src/rdkafka_telemetry_decode.h | 2 +- src/rdunittest.c | 2 + 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_telemetry_decode.c b/src/rdkafka_telemetry_decode.c index cbadf8062f..8e2ebbaf21 100644 --- a/src/rdkafka_telemetry_decode.c +++ b/src/rdkafka_telemetry_decode.c @@ -32,6 +32,10 @@ #include "nanopb/pb_encode.h" #include "nanopb/pb_decode.h" #include "opentelemetry/metrics.pb.h" +#include "rdkafka.h" +#include "rdkafka_int.h" +#include "rdkafka_telemetry_encode.h" +#include "rdunittest.h" #define _NANOPB_STRING_DECODE_MAX_BUFFER_SIZE 1024 @@ -170,7 +174,7 @@ static bool decode_and_print_resource_metrics(pb_istream_t *stream, * opentelemetry_proto_metrics_v1_MetricsData datatype. Used for testing and * debugging. */ -void rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) { +int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) { opentelemetry_proto_metrics_v1_MetricsData metricsData = opentelemetry_proto_metrics_v1_MetricsData_init_zero; @@ -185,4 +189,77 @@ void rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) { fprintf(stderr, "Failed to decode MetricsData: %s\n", PB_GET_ERROR(&stream)); } + return status; } + +bool ut_telemetry_gauge() { + rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); + rk->rk_type = RD_KAFKA_PRODUCER; + rk->rk_telemetry.matched_metrics_cnt = 1; + rk->rk_telemetry.matched_metrics = + rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) * + rk->rk_telemetry.matched_metrics_cnt); + rk->rk_telemetry.matched_metrics[0] = + RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_RATE; + rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); + TAILQ_INIT(&rk->rk_brokers); + + rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); + rkb->rkb_c.connects.val = 1; + TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); + + size_t metrics_payload_size = 0, metrics_payload_size_expected = 162; + + void *metrics_payload = rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); + RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); + + RD_UT_ASSERT(metrics_payload_size == metrics_payload_size_expected , "Metrics payload size mismatch. Expected: %zu, Actual: %zu", metrics_payload_size_expected, metrics_payload_size); + + bool decode_status = rd_kafka_telemetry_decode_metrics(metrics_payload, metrics_payload_size); + + RD_UT_ASSERT(decode_status == 1, "Decoding failed"); + rd_free(metrics_payload); + rd_free(rkb); + rd_free(rk); + RD_UT_PASS(); +} + +bool ut_test_sum() { + rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); + rk->rk_type = RD_KAFKA_PRODUCER; + rk->rk_telemetry.matched_metrics_cnt = 1; + rk->rk_telemetry.matched_metrics = + rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) * + rk->rk_telemetry.matched_metrics_cnt); + rk->rk_telemetry.matched_metrics[0] = + RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_TOTAL; + rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); + TAILQ_INIT(&rk->rk_brokers); + + rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); + rkb->rkb_c.connects.val = 1; + TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); + + size_t metrics_payload_size = 0, metrics_payload_size_expected = 164; + + void *metrics_payload = rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); + RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); + + RD_UT_ASSERT(metrics_payload_size == metrics_payload_size_expected , "Metrics payload size mismatch. Expected: %zu, Actual: %zu", metrics_payload_size_expected, metrics_payload_size); + + bool decode_status = rd_kafka_telemetry_decode_metrics(metrics_payload, metrics_payload_size); + + RD_UT_ASSERT(decode_status == 1, "Decoding failed"); + rd_free(metrics_payload); + rd_free(rkb); + rd_free(rk); + RD_UT_PASS(); + +} + +int unittest_telemetry_decode(void) { + int fails = 0; + fails += ut_telemetry_gauge(); + fails += ut_test_sum(); + return fails; +} \ No newline at end of file diff --git a/src/rdkafka_telemetry_decode.h b/src/rdkafka_telemetry_decode.h index a799bb2e79..a92c691c55 100644 --- a/src/rdkafka_telemetry_decode.h +++ b/src/rdkafka_telemetry_decode.h @@ -29,6 +29,6 @@ #ifndef _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H #define _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H -void rd_kafka_telemetry_decode_metrics(void *buffer, size_t size); +int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size); #endif /* _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H */ diff --git a/src/rdunittest.c b/src/rdunittest.c index 18236ca9ec..a75d2b8f65 100644 --- a/src/rdunittest.c +++ b/src/rdunittest.c @@ -426,6 +426,7 @@ extern int unittest_http(void); #if WITH_OAUTHBEARER_OIDC extern int unittest_sasl_oauthbearer_oidc(void); #endif +extern int unittest_telemetry_decode(void); int rd_unittest(void) { int fails = 0; @@ -466,6 +467,7 @@ int rd_unittest(void) { #if WITH_OAUTHBEARER_OIDC {"sasl_oauthbearer_oidc", unittest_sasl_oauthbearer_oidc}, #endif + {"telemetry", unittest_telemetry_decode}, {NULL} }; int i; From a2351c48242f36d33f3824b9a58721ace060a55c Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 20 Sep 2023 09:23:01 +0000 Subject: [PATCH 2/4] Style fix --- src/rdkafka_telemetry_decode.c | 35 +++++++++++++++++++++------------- src/rdkafka_telemetry_encode.c | 2 +- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/rdkafka_telemetry_decode.c b/src/rdkafka_telemetry_decode.c index 8e2ebbaf21..e6b9c8fdbb 100644 --- a/src/rdkafka_telemetry_decode.c +++ b/src/rdkafka_telemetry_decode.c @@ -193,8 +193,8 @@ int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) { } bool ut_telemetry_gauge() { - rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); - rk->rk_type = RD_KAFKA_PRODUCER; + rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); + rk->rk_type = RD_KAFKA_PRODUCER; rk->rk_telemetry.matched_metrics_cnt = 1; rk->rk_telemetry.matched_metrics = rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) * @@ -204,18 +204,23 @@ bool ut_telemetry_gauge() { rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); TAILQ_INIT(&rk->rk_brokers); - rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); + rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); rkb->rkb_c.connects.val = 1; TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); size_t metrics_payload_size = 0, metrics_payload_size_expected = 162; - void *metrics_payload = rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); + void *metrics_payload = + rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); - RD_UT_ASSERT(metrics_payload_size == metrics_payload_size_expected , "Metrics payload size mismatch. Expected: %zu, Actual: %zu", metrics_payload_size_expected, metrics_payload_size); + RD_UT_ASSERT( + metrics_payload_size == metrics_payload_size_expected, + "Metrics payload size mismatch. Expected: %zu, Actual: %zu", + metrics_payload_size_expected, metrics_payload_size); - bool decode_status = rd_kafka_telemetry_decode_metrics(metrics_payload, metrics_payload_size); + bool decode_status = rd_kafka_telemetry_decode_metrics( + metrics_payload, metrics_payload_size); RD_UT_ASSERT(decode_status == 1, "Decoding failed"); rd_free(metrics_payload); @@ -225,8 +230,8 @@ bool ut_telemetry_gauge() { } bool ut_test_sum() { - rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); - rk->rk_type = RD_KAFKA_PRODUCER; + rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); + rk->rk_type = RD_KAFKA_PRODUCER; rk->rk_telemetry.matched_metrics_cnt = 1; rk->rk_telemetry.matched_metrics = rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) * @@ -236,25 +241,29 @@ bool ut_test_sum() { rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); TAILQ_INIT(&rk->rk_brokers); - rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); + rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); rkb->rkb_c.connects.val = 1; TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); size_t metrics_payload_size = 0, metrics_payload_size_expected = 164; - void *metrics_payload = rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); + void *metrics_payload = + rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); - RD_UT_ASSERT(metrics_payload_size == metrics_payload_size_expected , "Metrics payload size mismatch. Expected: %zu, Actual: %zu", metrics_payload_size_expected, metrics_payload_size); + RD_UT_ASSERT( + metrics_payload_size == metrics_payload_size_expected, + "Metrics payload size mismatch. Expected: %zu, Actual: %zu", + metrics_payload_size_expected, metrics_payload_size); - bool decode_status = rd_kafka_telemetry_decode_metrics(metrics_payload, metrics_payload_size); + bool decode_status = rd_kafka_telemetry_decode_metrics( + metrics_payload, metrics_payload_size); RD_UT_ASSERT(decode_status == 1, "Decoding failed"); rd_free(metrics_payload); rd_free(rkb); rd_free(rk); RD_UT_PASS(); - } int unittest_telemetry_decode(void) { diff --git a/src/rdkafka_telemetry_encode.c b/src/rdkafka_telemetry_encode.c index e78d2c2cd8..1f091019b6 100644 --- a/src/rdkafka_telemetry_encode.c +++ b/src/rdkafka_telemetry_encode.c @@ -301,7 +301,7 @@ void *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk, size_t *size) { .doubleValue; } - data_points[i]->time_unix_nano = now_ns; + data_points[i]->time_unix_nano = now_ns; data_points[i]->start_time_unix_nano = now_ns; // TODO: Add data point attributes as needed From 1f938985923cca0063ef724096efe2811634fde8 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 25 Sep 2023 12:49:04 +0530 Subject: [PATCH 3/4] Improve test --- src/rdkafka_mock_handlers.c | 3 +- src/rdkafka_telemetry_decode.c | 143 ++++++++++++++++++++++++++++----- src/rdkafka_telemetry_decode.h | 4 +- 3 files changed, 127 insertions(+), 23 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 573d91e038..1325760654 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2196,7 +2196,8 @@ static int rd_kafka_mock_handle_PushTelemetry(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_i8(rkbuf, &compression_type); rd_kafka_buf_read_kbytes(rkbuf, &metrics); - rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len); + rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len, + rd_false); // ThrottleTime rd_kafka_buf_write_i32(resp, 0); diff --git a/src/rdkafka_telemetry_decode.c b/src/rdkafka_telemetry_decode.c index e6b9c8fdbb..acba5b6770 100644 --- a/src/rdkafka_telemetry_decode.c +++ b/src/rdkafka_telemetry_decode.c @@ -32,13 +32,27 @@ #include "nanopb/pb_encode.h" #include "nanopb/pb_decode.h" #include "opentelemetry/metrics.pb.h" -#include "rdkafka.h" #include "rdkafka_int.h" #include "rdkafka_telemetry_encode.h" #include "rdunittest.h" #define _NANOPB_STRING_DECODE_MAX_BUFFER_SIZE 1024 +struct metric_unit_test_data { + rd_kafka_telemetry_metric_type_t type; + char metric_name[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE]; + char metric_description[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE]; + char metric_unit[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE]; + int64_t metric_value; + uint64_t metric_time; +}; + +struct metric_unit_test_data unit_test_data; + +bool (*decode_and_print_metric_ptr)(pb_istream_t *stream, + const pb_field_t *field, + void **arg) = NULL; + static bool decode_and_print_string(pb_istream_t *stream, const pb_field_t *field, void **arg) { @@ -54,6 +68,10 @@ static bool decode_and_print_string(pb_istream_t *stream, return false; } fprintf(stderr, "String: %s\n", buffer); + if (arg != NULL && *arg != NULL) { + rd_strlcpy(*arg, (char *)buffer, + _NANOPB_STRING_DECODE_MAX_BUFFER_SIZE); + } return true; } @@ -90,7 +108,13 @@ static bool decode_and_print_number_data_point(pb_istream_t *stream, return false; } - fprintf(stderr, "NumberDataPoint value: %lld time: %llu\n", + if (arg != NULL && *arg != NULL) { + struct metric_unit_test_data *test_data = *arg; + test_data->metric_value = data_point.value.as_int; + test_data->metric_time = data_point.time_unix_nano; + } + + fprintf(stderr, "NumberDataPoint value: %ld time: %lu\n", data_point.value.as_int, data_point.time_unix_nano); return true; } @@ -102,11 +126,21 @@ data_msg_callback(pb_istream_t *stream, const pb_field_t *field, void **arg) { opentelemetry_proto_metrics_v1_Sum *sum = field->pData; sum->data_points.funcs.decode = &decode_and_print_number_data_point; + if (arg != NULL && *arg != NULL) { + sum->data_points.arg = &unit_test_data; + struct metric_unit_test_data *data = *arg; + data->type = RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM; + } } else if (field->tag == opentelemetry_proto_metrics_v1_Metric_gauge_tag) { opentelemetry_proto_metrics_v1_Gauge *gauge = field->pData; gauge->data_points.funcs.decode = &decode_and_print_number_data_point; + if (arg != NULL && *arg != NULL) { + gauge->data_points.arg = &unit_test_data; + struct metric_unit_test_data *data = *arg; + data->type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE; + } } return true; } @@ -132,6 +166,31 @@ static bool decode_and_print_metric(pb_istream_t *stream, return true; } +static bool decode_and_print_metric_unittest(pb_istream_t *stream, + const pb_field_t *field, + void **arg) { + opentelemetry_proto_metrics_v1_Metric metric = + opentelemetry_proto_metrics_v1_Metric_init_zero; + metric.name.funcs.decode = &decode_and_print_string; + metric.name.arg = &unit_test_data.metric_name; + metric.description.funcs.decode = &decode_and_print_string; + metric.description.arg = &unit_test_data.metric_description; + metric.unit.funcs.decode = &decode_and_print_string; + metric.unit.arg = &unit_test_data.metric_unit; + metric.cb_data.funcs.decode = &data_msg_callback; + metric.cb_data.arg = &unit_test_data; + + if (!pb_decode(stream, opentelemetry_proto_metrics_v1_Metric_fields, + &metric)) { + fprintf(stderr, "Failed to decode Metric: %s\n", + PB_GET_ERROR(stream)); + return false; + } + + return true; +} + + static bool decode_and_print_scope_metrics(pb_istream_t *stream, const pb_field_t *field, void **arg) { @@ -139,7 +198,7 @@ static bool decode_and_print_scope_metrics(pb_istream_t *stream, opentelemetry_proto_metrics_v1_ScopeMetrics_init_zero; scope_metrics.scope.name.funcs.decode = &decode_and_print_string; scope_metrics.scope.version.funcs.decode = &decode_and_print_string; - scope_metrics.metrics.funcs.decode = &decode_and_print_metric; + scope_metrics.metrics.funcs.decode = decode_and_print_metric_ptr; if (!pb_decode(stream, opentelemetry_proto_metrics_v1_ScopeMetrics_fields, &scope_metrics)) { @@ -174,13 +233,19 @@ static bool decode_and_print_resource_metrics(pb_istream_t *stream, * opentelemetry_proto_metrics_v1_MetricsData datatype. Used for testing and * debugging. */ -int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) { +int rd_kafka_telemetry_decode_metrics(void *buffer, + size_t size, + rd_bool_t is_unit_test) { opentelemetry_proto_metrics_v1_MetricsData metricsData = opentelemetry_proto_metrics_v1_MetricsData_init_zero; pb_istream_t stream = pb_istream_from_buffer(buffer, size); metricsData.resource_metrics.funcs.decode = &decode_and_print_resource_metrics; + if (is_unit_test) + decode_and_print_metric_ptr = &decode_and_print_metric_unittest; + else + decode_and_print_metric_ptr = &decode_and_print_metric; bool status = pb_decode( &stream, opentelemetry_proto_metrics_v1_MetricsData_fields, @@ -192,7 +257,16 @@ int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) { return status; } -bool ut_telemetry_gauge() { +void clear_unit_test_data() { + unit_test_data.type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE; + unit_test_data.metric_name[0] = '\0'; + unit_test_data.metric_description[0] = '\0'; + unit_test_data.metric_unit[0] = '\0'; + unit_test_data.metric_value = 0; + unit_test_data.metric_time = 0; +} + +bool unit_test_telemetry_gauge() { rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); rk->rk_type = RD_KAFKA_PRODUCER; rk->rk_telemetry.matched_metrics_cnt = 1; @@ -208,28 +282,42 @@ bool ut_telemetry_gauge() { rkb->rkb_c.connects.val = 1; TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); - size_t metrics_payload_size = 0, metrics_payload_size_expected = 162; + size_t metrics_payload_size = 0; + clear_unit_test_data(); void *metrics_payload = rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); - RD_UT_ASSERT( - metrics_payload_size == metrics_payload_size_expected, - "Metrics payload size mismatch. Expected: %zu, Actual: %zu", - metrics_payload_size_expected, metrics_payload_size); + RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero"); bool decode_status = rd_kafka_telemetry_decode_metrics( - metrics_payload, metrics_payload_size); + metrics_payload, metrics_payload_size, rd_true); RD_UT_ASSERT(decode_status == 1, "Decoding failed"); + RD_UT_ASSERT(unit_test_data.type == + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, + "Metric type mismatch"); + RD_UT_ASSERT(strcmp(unit_test_data.metric_name, + "producer.connection.creation.rate") == 0, + "Metric name mismatch"); + RD_UT_ASSERT( + strcmp(unit_test_data.metric_description, + "The rate of connections established per second.") == 0, + "Metric description mismatch"); + RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0, + "Metric unit mismatch"); + RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch"); + RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch"); + + rd_free(rk->rk_telemetry.matched_metrics); rd_free(metrics_payload); rd_free(rkb); rd_free(rk); RD_UT_PASS(); } -bool ut_test_sum() { +bool unit_test_telemetry_sum() { rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); rk->rk_type = RD_KAFKA_PRODUCER; rk->rk_telemetry.matched_metrics_cnt = 1; @@ -245,21 +333,34 @@ bool ut_test_sum() { rkb->rkb_c.connects.val = 1; TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); - size_t metrics_payload_size = 0, metrics_payload_size_expected = 164; + size_t metrics_payload_size = 0; + clear_unit_test_data(); void *metrics_payload = rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); - RD_UT_ASSERT( - metrics_payload_size == metrics_payload_size_expected, - "Metrics payload size mismatch. Expected: %zu, Actual: %zu", - metrics_payload_size_expected, metrics_payload_size); + RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero"); bool decode_status = rd_kafka_telemetry_decode_metrics( - metrics_payload, metrics_payload_size); + metrics_payload, metrics_payload_size, rd_true); RD_UT_ASSERT(decode_status == 1, "Decoding failed"); + RD_UT_ASSERT(unit_test_data.type == RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM, + "Metric type mismatch"); + RD_UT_ASSERT(strcmp(unit_test_data.metric_name, + "producer.connection.creation.total") == 0, + "Metric name mismatch"); + RD_UT_ASSERT(strcmp(unit_test_data.metric_description, + "The total number of connections established.") == + 0, + "Metric description mismatch"); + RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0, + "Metric unit mismatch"); + RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch"); + RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch"); + + rd_free(rk->rk_telemetry.matched_metrics); rd_free(metrics_payload); rd_free(rkb); rd_free(rk); @@ -268,7 +369,7 @@ bool ut_test_sum() { int unittest_telemetry_decode(void) { int fails = 0; - fails += ut_telemetry_gauge(); - fails += ut_test_sum(); + fails += unit_test_telemetry_gauge(); + fails += unit_test_telemetry_sum(); return fails; -} \ No newline at end of file +} diff --git a/src/rdkafka_telemetry_decode.h b/src/rdkafka_telemetry_decode.h index a92c691c55..5ac9a30ff1 100644 --- a/src/rdkafka_telemetry_decode.h +++ b/src/rdkafka_telemetry_decode.h @@ -29,6 +29,8 @@ #ifndef _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H #define _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H -int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size); +int rd_kafka_telemetry_decode_metrics(void *buffer, + size_t size, + rd_bool_t is_unit_test); #endif /* _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H */ From 243a4c423cd68ad380e172ee5102fb5a52d42884 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Thu, 28 Sep 2023 16:34:19 +0530 Subject: [PATCH 4/4] PR Feedback --- src/rdkafka_telemetry_decode.c | 85 ++++++++++------------------------ 1 file changed, 25 insertions(+), 60 deletions(-) diff --git a/src/rdkafka_telemetry_decode.c b/src/rdkafka_telemetry_decode.c index acba5b6770..614af0d79c 100644 --- a/src/rdkafka_telemetry_decode.c +++ b/src/rdkafka_telemetry_decode.c @@ -47,7 +47,7 @@ struct metric_unit_test_data { uint64_t metric_time; }; -struct metric_unit_test_data unit_test_data; +static struct metric_unit_test_data unit_test_data; bool (*decode_and_print_metric_ptr)(pb_istream_t *stream, const pb_field_t *field, @@ -257,7 +257,7 @@ int rd_kafka_telemetry_decode_metrics(void *buffer, return status; } -void clear_unit_test_data() { +static void clear_unit_test_data(void) { unit_test_data.type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE; unit_test_data.metric_name[0] = '\0'; unit_test_data.metric_description[0] = '\0'; @@ -266,15 +266,17 @@ void clear_unit_test_data() { unit_test_data.metric_time = 0; } -bool unit_test_telemetry_gauge() { +bool unit_test_telemetry(rd_kafka_telemetry_metric_name_t metric_name, + const char *expected_name, + const char *expected_description, + rd_kafka_telemetry_metric_type_t expected_type) { rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); rk->rk_type = RD_KAFKA_PRODUCER; rk->rk_telemetry.matched_metrics_cnt = 1; rk->rk_telemetry.matched_metrics = rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) * rk->rk_telemetry.matched_metrics_cnt); - rk->rk_telemetry.matched_metrics[0] = - RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_RATE; + rk->rk_telemetry.matched_metrics[0] = metric_name; rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); TAILQ_INIT(&rk->rk_brokers); @@ -295,16 +297,13 @@ bool unit_test_telemetry_gauge() { metrics_payload, metrics_payload_size, rd_true); RD_UT_ASSERT(decode_status == 1, "Decoding failed"); - RD_UT_ASSERT(unit_test_data.type == - RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE, + RD_UT_ASSERT(unit_test_data.type == expected_type, "Metric type mismatch"); - RD_UT_ASSERT(strcmp(unit_test_data.metric_name, - "producer.connection.creation.rate") == 0, + RD_UT_ASSERT(strcmp(unit_test_data.metric_name, expected_name) == 0, "Metric name mismatch"); - RD_UT_ASSERT( - strcmp(unit_test_data.metric_description, - "The rate of connections established per second.") == 0, - "Metric description mismatch"); + RD_UT_ASSERT(strcmp(unit_test_data.metric_description, + expected_description) == 0, + "Metric description mismatch"); RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0, "Metric unit mismatch"); RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch"); @@ -317,54 +316,20 @@ bool unit_test_telemetry_gauge() { RD_UT_PASS(); } -bool unit_test_telemetry_sum() { - rd_kafka_t *rk = rd_calloc(1, sizeof(*rk)); - rk->rk_type = RD_KAFKA_PRODUCER; - rk->rk_telemetry.matched_metrics_cnt = 1; - rk->rk_telemetry.matched_metrics = - rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) * - rk->rk_telemetry.matched_metrics_cnt); - rk->rk_telemetry.matched_metrics[0] = - RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_TOTAL; - rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name)); - TAILQ_INIT(&rk->rk_brokers); - - rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb)); - rkb->rkb_c.connects.val = 1; - TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link); - - size_t metrics_payload_size = 0; - clear_unit_test_data(); - - void *metrics_payload = - rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size); - RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size); - - RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero"); - - bool decode_status = rd_kafka_telemetry_decode_metrics( - metrics_payload, metrics_payload_size, rd_true); - - RD_UT_ASSERT(decode_status == 1, "Decoding failed"); - RD_UT_ASSERT(unit_test_data.type == RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM, - "Metric type mismatch"); - RD_UT_ASSERT(strcmp(unit_test_data.metric_name, - "producer.connection.creation.total") == 0, - "Metric name mismatch"); - RD_UT_ASSERT(strcmp(unit_test_data.metric_description, - "The total number of connections established.") == - 0, - "Metric description mismatch"); - RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0, - "Metric unit mismatch"); - RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch"); - RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch"); +bool unit_test_telemetry_gauge(void) { + return unit_test_telemetry( + RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_RATE, + "producer.connection.creation.rate", + "The rate of connections established per second.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE); +} - rd_free(rk->rk_telemetry.matched_metrics); - rd_free(metrics_payload); - rd_free(rkb); - rd_free(rk); - RD_UT_PASS(); +bool unit_test_telemetry_sum(void) { + return unit_test_telemetry( + RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_TOTAL, + "producer.connection.creation.total", + "The total number of connections established.", + RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM); } int unittest_telemetry_decode(void) {