Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests push telemetry encode decode #4440

Merged
merged 4 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2196,7 +2196,8 @@ static int rd_kafka_mock_handle_PushTelemetry(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_i8(rkbuf, &compression_type);
rd_kafka_buf_read_kbytes(rkbuf, &metrics);

rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len);
rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len,
rd_false);

// ThrottleTime
rd_kafka_buf_write_i32(resp, 0);
Expand Down
193 changes: 190 additions & 3 deletions src/rdkafka_telemetry_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,27 @@
#include "nanopb/pb_encode.h"
#include "nanopb/pb_decode.h"
#include "opentelemetry/metrics.pb.h"
#include "rdkafka_int.h"
#include "rdkafka_telemetry_encode.h"
#include "rdunittest.h"

#define _NANOPB_STRING_DECODE_MAX_BUFFER_SIZE 1024

struct metric_unit_test_data {
rd_kafka_telemetry_metric_type_t type;
char metric_name[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE];
char metric_description[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE];
char metric_unit[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE];
int64_t metric_value;
uint64_t metric_time;
};

struct metric_unit_test_data unit_test_data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we use this global variable only in this file, declare it static.


bool (*decode_and_print_metric_ptr)(pb_istream_t *stream,
const pb_field_t *field,
void **arg) = NULL;

static bool decode_and_print_string(pb_istream_t *stream,
const pb_field_t *field,
void **arg) {
Expand All @@ -50,6 +68,10 @@ static bool decode_and_print_string(pb_istream_t *stream,
return false;
}
fprintf(stderr, "String: %s\n", buffer);
if (arg != NULL && *arg != NULL) {
rd_strlcpy(*arg, (char *)buffer,
_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE);
}

return true;
}
Expand Down Expand Up @@ -86,7 +108,13 @@ static bool decode_and_print_number_data_point(pb_istream_t *stream,
return false;
}

fprintf(stderr, "NumberDataPoint value: %lld time: %llu\n",
if (arg != NULL && *arg != NULL) {
struct metric_unit_test_data *test_data = *arg;
test_data->metric_value = data_point.value.as_int;
test_data->metric_time = data_point.time_unix_nano;
}

fprintf(stderr, "NumberDataPoint value: %ld time: %lu\n",
data_point.value.as_int, data_point.time_unix_nano);
return true;
}
Expand All @@ -98,11 +126,21 @@ data_msg_callback(pb_istream_t *stream, const pb_field_t *field, void **arg) {
opentelemetry_proto_metrics_v1_Sum *sum = field->pData;
sum->data_points.funcs.decode =
&decode_and_print_number_data_point;
if (arg != NULL && *arg != NULL) {
sum->data_points.arg = &unit_test_data;
struct metric_unit_test_data *data = *arg;
data->type = RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM;
}
} else if (field->tag ==
opentelemetry_proto_metrics_v1_Metric_gauge_tag) {
opentelemetry_proto_metrics_v1_Gauge *gauge = field->pData;
gauge->data_points.funcs.decode =
&decode_and_print_number_data_point;
if (arg != NULL && *arg != NULL) {
gauge->data_points.arg = &unit_test_data;
struct metric_unit_test_data *data = *arg;
data->type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE;
}
}
return true;
}
Expand All @@ -128,14 +166,39 @@ static bool decode_and_print_metric(pb_istream_t *stream,
return true;
}

static bool decode_and_print_metric_unittest(pb_istream_t *stream,
const pb_field_t *field,
void **arg) {
opentelemetry_proto_metrics_v1_Metric metric =
opentelemetry_proto_metrics_v1_Metric_init_zero;
metric.name.funcs.decode = &decode_and_print_string;
metric.name.arg = &unit_test_data.metric_name;
metric.description.funcs.decode = &decode_and_print_string;
metric.description.arg = &unit_test_data.metric_description;
metric.unit.funcs.decode = &decode_and_print_string;
metric.unit.arg = &unit_test_data.metric_unit;
metric.cb_data.funcs.decode = &data_msg_callback;
metric.cb_data.arg = &unit_test_data;

if (!pb_decode(stream, opentelemetry_proto_metrics_v1_Metric_fields,
&metric)) {
fprintf(stderr, "Failed to decode Metric: %s\n",
PB_GET_ERROR(stream));
return false;
}

return true;
}


static bool decode_and_print_scope_metrics(pb_istream_t *stream,
const pb_field_t *field,
void **arg) {
opentelemetry_proto_metrics_v1_ScopeMetrics scope_metrics =
opentelemetry_proto_metrics_v1_ScopeMetrics_init_zero;
scope_metrics.scope.name.funcs.decode = &decode_and_print_string;
scope_metrics.scope.version.funcs.decode = &decode_and_print_string;
scope_metrics.metrics.funcs.decode = &decode_and_print_metric;
scope_metrics.metrics.funcs.decode = decode_and_print_metric_ptr;
if (!pb_decode(stream,
opentelemetry_proto_metrics_v1_ScopeMetrics_fields,
&scope_metrics)) {
Expand Down Expand Up @@ -170,13 +233,19 @@ static bool decode_and_print_resource_metrics(pb_istream_t *stream,
* opentelemetry_proto_metrics_v1_MetricsData datatype. Used for testing and
* debugging.
*/
void rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) {
int rd_kafka_telemetry_decode_metrics(void *buffer,
size_t size,
rd_bool_t is_unit_test) {
opentelemetry_proto_metrics_v1_MetricsData metricsData =
opentelemetry_proto_metrics_v1_MetricsData_init_zero;

pb_istream_t stream = pb_istream_from_buffer(buffer, size);
metricsData.resource_metrics.funcs.decode =
&decode_and_print_resource_metrics;
if (is_unit_test)
decode_and_print_metric_ptr = &decode_and_print_metric_unittest;
else
decode_and_print_metric_ptr = &decode_and_print_metric;

bool status = pb_decode(
&stream, opentelemetry_proto_metrics_v1_MetricsData_fields,
Expand All @@ -185,4 +254,122 @@ void rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) {
fprintf(stderr, "Failed to decode MetricsData: %s\n",
PB_GET_ERROR(&stream));
}
return status;
}

void clear_unit_test_data() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark these methods 'static' , everything that's used as a helper function for testing, or non-top level test functions, it's best to make static

unit_test_data.type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE;
unit_test_data.metric_name[0] = '\0';
unit_test_data.metric_description[0] = '\0';
unit_test_data.metric_unit[0] = '\0';
unit_test_data.metric_value = 0;
unit_test_data.metric_time = 0;
}

bool unit_test_telemetry_gauge() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool unit_test_telemetry_gauge() {
bool unit_test_telemetry_gauge(void) {

minor

rd_kafka_t *rk = rd_calloc(1, sizeof(*rk));
rk->rk_type = RD_KAFKA_PRODUCER;
rk->rk_telemetry.matched_metrics_cnt = 1;
rk->rk_telemetry.matched_metrics =
rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) *
rk->rk_telemetry.matched_metrics_cnt);
rk->rk_telemetry.matched_metrics[0] =
RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_RATE;
rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name));
TAILQ_INIT(&rk->rk_brokers);

rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb));
rkb->rkb_c.connects.val = 1;
TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link);

size_t metrics_payload_size = 0;
clear_unit_test_data();

void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size);
RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size);

RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero");

bool decode_status = rd_kafka_telemetry_decode_metrics(
metrics_payload, metrics_payload_size, rd_true);

RD_UT_ASSERT(decode_status == 1, "Decoding failed");
RD_UT_ASSERT(unit_test_data.type ==
RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE,
"Metric type mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_name,
"producer.connection.creation.rate") == 0,
"Metric name mismatch");
RD_UT_ASSERT(
strcmp(unit_test_data.metric_description,
"The rate of connections established per second.") == 0,
"Metric description mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0,
"Metric unit mismatch");
RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch");
RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch");

rd_free(rk->rk_telemetry.matched_metrics);
rd_free(metrics_payload);
rd_free(rkb);
rd_free(rk);
RD_UT_PASS();
}

bool unit_test_telemetry_sum() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool unit_test_telemetry_sum() {
bool unit_test_telemetry_sum(void) {

minor

rd_kafka_t *rk = rd_calloc(1, sizeof(*rk));
rk->rk_type = RD_KAFKA_PRODUCER;
rk->rk_telemetry.matched_metrics_cnt = 1;
rk->rk_telemetry.matched_metrics =
rd_malloc(sizeof(rd_kafka_telemetry_metric_name_t) *
rk->rk_telemetry.matched_metrics_cnt);
rk->rk_telemetry.matched_metrics[0] =
RD_KAFKA_TELEMETRY_METRIC_CONNECTION_CREATION_TOTAL;
rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name));
TAILQ_INIT(&rk->rk_brokers);

rd_kafka_broker_t *rkb = rd_calloc(1, sizeof(*rkb));
rkb->rkb_c.connects.val = 1;
TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link);

size_t metrics_payload_size = 0;
clear_unit_test_data();

void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size);
RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size);

RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero");

bool decode_status = rd_kafka_telemetry_decode_metrics(
metrics_payload, metrics_payload_size, rd_true);

RD_UT_ASSERT(decode_status == 1, "Decoding failed");
RD_UT_ASSERT(unit_test_data.type == RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM,
"Metric type mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_name,
"producer.connection.creation.total") == 0,
"Metric name mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_description,
"The total number of connections established.") ==
0,
"Metric description mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0,
"Metric unit mismatch");
RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch");
RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch");

rd_free(rk->rk_telemetry.matched_metrics);
rd_free(metrics_payload);
rd_free(rkb);
rd_free(rk);
RD_UT_PASS();
}

int unittest_telemetry_decode(void) {
int fails = 0;
fails += unit_test_telemetry_gauge();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both these tests can be turned into one function with a few params, they just differ at one or two places and the rest is the same

fails += unit_test_telemetry_sum();
return fails;
}
4 changes: 3 additions & 1 deletion src/rdkafka_telemetry_decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#ifndef _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H
#define _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H

void rd_kafka_telemetry_decode_metrics(void *buffer, size_t size);
int rd_kafka_telemetry_decode_metrics(void *buffer,
size_t size,
rd_bool_t is_unit_test);

#endif /* _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H */
2 changes: 1 addition & 1 deletion src/rdkafka_telemetry_encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk, size_t *size) {
.doubleValue;
}

data_points[i]->time_unix_nano = now_ns;
data_points[i]->time_unix_nano = now_ns;
data_points[i]->start_time_unix_nano = now_ns;

// TODO: Add data point attributes as needed
Expand Down
2 changes: 2 additions & 0 deletions src/rdunittest.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ extern int unittest_http(void);
#if WITH_OAUTHBEARER_OIDC
extern int unittest_sasl_oauthbearer_oidc(void);
#endif
extern int unittest_telemetry_decode(void);

int rd_unittest(void) {
int fails = 0;
Expand Down Expand Up @@ -466,6 +467,7 @@ int rd_unittest(void) {
#if WITH_OAUTHBEARER_OIDC
{"sasl_oauthbearer_oidc", unittest_sasl_oauthbearer_oidc},
#endif
{"telemetry", unittest_telemetry_decode},
{NULL}
};
int i;
Expand Down