From 69c29d018e3921bad13dc1af2c4c0f197a266af6 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:51:35 +0530 Subject: [PATCH] Fix an assert being triggered when no metrics matched on the client side during send push telemetry call --- CHANGELOG.md | 7 +++++++ src/rdkafka_telemetry.c | 33 ++++++++++++++++++--------------- src/rdkafka_telemetry_encode.c | 4 ++++ tests/0150-telemetry_mock.c | 10 +++++++--- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68142d0d3c..b3d21e6d40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# librdkafka v2.5.1 + +librdkafka v2.5.1 is a feature release. + +* Fix an assert being triggered when no metrics matched on the client side during send push telemetry call. + + # librdkafka v2.5.0 librdkafka v2.5.0 is a feature release. diff --git a/src/rdkafka_telemetry.c b/src/rdkafka_telemetry.c index 3f2fece177..f794e87509 100644 --- a/src/rdkafka_telemetry.c +++ b/src/rdkafka_telemetry.c @@ -343,20 +343,22 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk, rd_bool_t terminating) { rd_buf_t *metrics_payload = rd_kafka_telemetry_encode_metrics(rk); - size_t compressed_metrics_payload_size = 0; - void *compressed_metrics_payload = NULL; - rd_kafka_compression_t compression_used = - rd_kafka_push_telemetry_payload_compress( - rk, rkb, metrics_payload, &compressed_metrics_payload, - &compressed_metrics_payload_size); - if (compressed_metrics_payload_size > - (size_t)rk->rk_telemetry.telemetry_max_bytes) { - rd_kafka_log(rk, LOG_WARNING, "TELEMETRY", - "Metrics payload size %" PRIusz - " exceeds telemetry_max_bytes %" PRId32 - "specified by the broker.", - compressed_metrics_payload_size, - rk->rk_telemetry.telemetry_max_bytes); + size_t compressed_metrics_payload_size = 0; + void *compressed_metrics_payload = NULL; + rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE; + if (metrics_payload) { + compression_used = rd_kafka_push_telemetry_payload_compress( + rk, rkb, metrics_payload, &compressed_metrics_payload, + &compressed_metrics_payload_size); + if (compressed_metrics_payload_size > + (size_t)rk->rk_telemetry.telemetry_max_bytes) { + rd_kafka_log(rk, LOG_WARNING, "TELEMETRY", + "Metrics payload size %" PRIusz + " exceeds telemetry_max_bytes %" PRId32 + "specified by the broker.", + compressed_metrics_payload_size, + rk->rk_telemetry.telemetry_max_bytes); + } } rd_kafka_dbg(rk, TELEMETRY, "PUSH", @@ -369,7 +371,8 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk, 0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry, NULL); - rd_buf_destroy_free(metrics_payload); + if (metrics_payload) + rd_buf_destroy_free(metrics_payload); if (compression_used != RD_KAFKA_COMPRESSION_NONE) rd_free(compressed_metrics_payload); diff --git a/src/rdkafka_telemetry_encode.c b/src/rdkafka_telemetry_encode.c index 5e5a5a3dc1..05a27562e1 100644 --- a/src/rdkafka_telemetry_encode.c +++ b/src/rdkafka_telemetry_encode.c @@ -609,6 +609,10 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) { RD_KAFKA_TELEMETRY_METRIC_INFO(rk); size_t total_metrics_count = metrics_to_encode_count; size_t i, metric_idx = 0; + + if (!metrics_to_encode_count) + return NULL; + opentelemetry_proto_metrics_v1_MetricsData metrics_data = opentelemetry_proto_metrics_v1_MetricsData_init_zero; diff --git a/tests/0150-telemetry_mock.c b/tests/0150-telemetry_mock.c index 52fb76032f..040054cc6c 100644 --- a/tests/0150-telemetry_mock.c +++ b/tests/0150-telemetry_mock.c @@ -202,11 +202,11 @@ void do_test_telemetry_get_subscription_push_telemetry(void) { * resent after the push interval until there are subscriptions. * See `requests_expected` for detailed expected flow. */ -void do_test_telemetry_empty_subscriptions_list(void) { +void do_test_telemetry_empty_subscriptions_list(char *subscription_regex) { rd_kafka_conf_t *conf; const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; + char *expected_metrics[] = {subscription_regex}; rd_kafka_t *producer = NULL; rd_kafka_mock_request_t **requests = NULL; size_t request_cnt; @@ -534,7 +534,11 @@ int main_0150_telemetry_mock(int argc, char **argv) { do_test_telemetry_get_subscription_push_telemetry(); - do_test_telemetry_empty_subscriptions_list(); + // All metrics are subscribed + do_test_telemetry_empty_subscriptions_list("*"); + + // No metrics are subscribed + do_test_telemetry_empty_subscriptions_list("non-existent-metric"); do_test_telemetry_terminating_push();