Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an assert being triggered when no metrics matched on the client side during send push telemetry call #4825

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# librdkafka v2.5.1

librdkafka v2.5.1 is a feature release.

* Fix an assert being triggered when no metrics matched on the client side during send push telemetry call.


# librdkafka v2.5.0

librdkafka v2.5.0 is a feature release.
Expand Down
33 changes: 18 additions & 15 deletions src/rdkafka_telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,20 +343,22 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
rd_bool_t terminating) {

rd_buf_t *metrics_payload = rd_kafka_telemetry_encode_metrics(rk);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used =
rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
if (metrics_payload) {
compression_used = rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can also add a debug log mentioning we're sending an empty metrics blob


rd_kafka_dbg(rk, TELEMETRY, "PUSH",
Expand All @@ -369,7 +371,8 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry,
NULL);

rd_buf_destroy_free(metrics_payload);
if (metrics_payload)
rd_buf_destroy_free(metrics_payload);
if (compression_used != RD_KAFKA_COMPRESSION_NONE)
rd_free(compressed_metrics_payload);

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_telemetry_encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) {
RD_KAFKA_TELEMETRY_METRIC_INFO(rk);
size_t total_metrics_count = metrics_to_encode_count;
size_t i, metric_idx = 0;

if (!metrics_to_encode_count)
return NULL;

opentelemetry_proto_metrics_v1_MetricsData metrics_data =
opentelemetry_proto_metrics_v1_MetricsData_init_zero;

Expand Down
10 changes: 7 additions & 3 deletions tests/0150-telemetry_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ void do_test_telemetry_get_subscription_push_telemetry(void) {
* resent after the push interval until there are subscriptions.
* See `requests_expected` for detailed expected flow.
*/
void do_test_telemetry_empty_subscriptions_list(void) {
void do_test_telemetry_empty_subscriptions_list(char *subscription_regex) {
rd_kafka_conf_t *conf;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
char *expected_metrics[] = {"*"};
char *expected_metrics[] = {subscription_regex};
rd_kafka_t *producer = NULL;
rd_kafka_mock_request_t **requests = NULL;
size_t request_cnt;
Expand Down Expand Up @@ -534,7 +534,11 @@ int main_0150_telemetry_mock(int argc, char **argv) {

do_test_telemetry_get_subscription_push_telemetry();

do_test_telemetry_empty_subscriptions_list();
// All metrics are subscribed
do_test_telemetry_empty_subscriptions_list("*");

// No metrics are subscribed
do_test_telemetry_empty_subscriptions_list("non-existent-metric");

do_test_telemetry_terminating_push();

Expand Down