Skip to content

Commit

Permalink
Fix an assert being triggered when no metrics matched on the client s…
Browse files Browse the repository at this point in the history
…ide during send push telemetry call (#4826)
  • Loading branch information
pranavrth authored Aug 30, 2024
1 parent d72576a commit 9416dd8
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 22 deletions.
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
# librdkafka v2.5.3

librdkafka v2.5.3 is a feature release.

* Fix an assert being triggered during push telemetry call when no metrics matched on the client side. (#4826)

## Fixes

### Telemetry fixes

* Issue: #4833
Fix a regression introduced with [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) support in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
Happening since 2.5.0 (#4826).

*Note: there were no v2.5.1 and v2.5.2 librdkafka releases*


# librdkafka v2.5.0

> [!WARNING]
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
>
> You won't face any problem if:
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side.
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
>
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.
librdkafka v2.5.0 is a feature release.

* [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client)
Expand Down
2 changes: 1 addition & 1 deletion src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ namespace RdKafka {
* @remark This value should only be used during compile time,
* for runtime checks of version use RdKafka::version()
*/
#define RD_KAFKA_VERSION 0x020500ff
#define RD_KAFKA_VERSION 0x020503ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ typedef SSIZE_T ssize_t;
* @remark This value should only be used during compile time,
* for runtime checks of version use rd_kafka_version()
*/
#define RD_KAFKA_VERSION 0x020500ff
#define RD_KAFKA_VERSION 0x020503ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
36 changes: 21 additions & 15 deletions src/rdkafka_telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,20 +343,25 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
rd_bool_t terminating) {

rd_buf_t *metrics_payload = rd_kafka_telemetry_encode_metrics(rk);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used =
rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
size_t compressed_metrics_payload_size = 0;
void *compressed_metrics_payload = NULL;
rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE;
if (metrics_payload) {
compression_used = rd_kafka_push_telemetry_payload_compress(
rk, rkb, metrics_payload, &compressed_metrics_payload,
&compressed_metrics_payload_size);
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
}
} else {
rd_kafka_dbg(rk, TELEMETRY, "PUSH",
"No metrics to push. Sending empty payload.");
}

rd_kafka_dbg(rk, TELEMETRY, "PUSH",
Expand All @@ -369,7 +374,8 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry,
NULL);

rd_buf_destroy_free(metrics_payload);
if (metrics_payload)
rd_buf_destroy_free(metrics_payload);
if (compression_used != RD_KAFKA_COMPRESSION_NONE)
rd_free(compressed_metrics_payload);

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_telemetry_encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) {
RD_KAFKA_TELEMETRY_METRIC_INFO(rk);
size_t total_metrics_count = metrics_to_encode_count;
size_t i, metric_idx = 0;

if (!metrics_to_encode_count)
return NULL;

opentelemetry_proto_metrics_v1_MetricsData metrics_data =
opentelemetry_proto_metrics_v1_MetricsData_init_zero;

Expand Down
12 changes: 8 additions & 4 deletions tests/0150-telemetry_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ void do_test_telemetry_get_subscription_push_telemetry(void) {
* resent after the push interval until there are subscriptions.
* See `requests_expected` for detailed expected flow.
*/
void do_test_telemetry_empty_subscriptions_list(void) {
void do_test_telemetry_empty_subscriptions_list(char *subscription_regex) {
rd_kafka_conf_t *conf;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
char *expected_metrics[] = {"*"};
char *expected_metrics[] = {subscription_regex};
rd_kafka_t *producer = NULL;
rd_kafka_mock_request_t **requests = NULL;
size_t request_cnt;
Expand Down Expand Up @@ -234,7 +234,7 @@ void do_test_telemetry_empty_subscriptions_list(void) {
};


SUB_TEST();
SUB_TEST("Test with subscription regex: %s", subscription_regex);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_telemetry_set_requested_metrics(mcluster, NULL, 0);
Expand Down Expand Up @@ -534,7 +534,11 @@ int main_0150_telemetry_mock(int argc, char **argv) {

do_test_telemetry_get_subscription_push_telemetry();

do_test_telemetry_empty_subscriptions_list();
// All metrics are subscribed
do_test_telemetry_empty_subscriptions_list("*");

// No metrics are subscribed
do_test_telemetry_empty_subscriptions_list("non-existent-metric");

do_test_telemetry_terminating_push();

Expand Down
2 changes: 1 addition & 1 deletion vcpkg.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "librdkafka",
"version": "2.5.0",
"version": "2.5.3",
"dependencies": [
{
"name": "zstd",
Expand Down

0 comments on commit 9416dd8

Please sign in to comment.