Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 714 New Telemetry Metrics #4808

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -3122,7 +3122,8 @@ static rd_kafka_op_res_t rd_kafka_consume_cb(rd_kafka_t *rk,
}

rkmessage = rd_kafka_message_get(rko);

/* stop the active ts */
/* t2 */
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_fetch_op_app_prepare(rk, rko);

ctx->consume_cb(rkmessage, ctx->opaque);
Expand Down Expand Up @@ -3218,10 +3219,21 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
rd_kafka_app_poll_blocking(rk);

rd_kafka_yield_thread = 0;
rd_ts_t now = rd_clock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many ways the user can call poll, see where rd_kafka_app_poll_blocking is called, those places aren't considered at the moment. So you can convert rd_kafka_app_poll_blocking to rd_kafka_app_poll_start with a parameter that says if it's blocking that corresponds to timeout_ms and do the calculation there.

Given we're in the hot path let's reduce system calls, you can get the now value here and pass it to rd_timeout_init0 and then pass it to rd_kafka_app_poll_start too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this thing further, since we are only refactoring how are we changing the order to reduce system call in the hot path ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System call here is for getting the monotonic clock, we want to do it only once to reduce number of these calls (it's already done in rd_timeout_init).

if (rk->rk_telemetry.ts_fetch_last != -1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be useful for something else than telemetry so we can call it rk->rk_ts_last_poll_start also the check is if it non-zero given it's initialized to zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

rd_ts_t poll_interval = now - rk->rk_telemetry.ts_fetch_last;
rd_ts_t idle_interval = rk->rk_telemetry.ts_fetch_last -
rk->rk_telemetry.ts_fetch_cb_last;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. ts_fetch_cb_last can be called rk->rk_ts_last_poll_end and set in two places: either when we call rd_kafka_app_polled (except on subscribe) or just before the callback as done below. If it's non-zero it's not replaced in rd_kafka_app_polled.
  2. the idle_interval difference must be the opposite. As we haven't overwritten ts_fetch_last it's still last fetch start not current one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

int64_t poll_idle_ratio =
((double)idle_interval * 1e7) / poll_interval;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting to double here brings an overhead, it's not necessary as we're then truncating it. It's enough to multiply idle_interval by 1000000 and divide by poll_interval.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize poll_idle_ratio to 0 and avoid doing idle_interval calculation and this division if poll_interval is zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this needs a discussion, as poll_interval cannot be exactly zero some slippage is expected so 1000 seems fine. Also we should not add false data points so only calculate the case where poll_interval > 1000. 1e7 -> 1e6 so the calculator uses SIX_ORDERS_MAGNITUDE to get value between 0 to 1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be zero as it's in microseconds, it happenned to me while testing. 1e6 is a double so it will cause a conversion that we don't need here.

rd_avg_add(
&rk->rk_telemetry.rk_avg_current.rk_avg_poll_idle_ratio,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rk_avg_poll_idle_ratio (rk_avg_current and rk_avg_rollover) needs to be initialized on rd_kafka_new if type is consumer and destroyed on rd_kafka_destroy_final with same condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

poll_idle_ratio);
}
rk->rk_telemetry.ts_fetch_last = now;
while ((
rko = rd_kafka_q_pop(rkq, rd_timeout_remains_us(abs_timeout), 0))) {
rd_kafka_op_res_t res;

res =
rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);

Expand Down Expand Up @@ -3889,6 +3901,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

switch ((int)rko->rko_type) {
case RD_KAFKA_OP_FETCH:
rk->rk_telemetry.ts_fetch_cb_last = rd_clock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following comment about rk_ts_last_poll_end this can be set only in else (callback) block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line left with previous name is causing a compilation error

if (!rk->rk_conf.consume_cb ||
cb_type == RD_KAFKA_Q_CB_RETURN ||
cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
Expand All @@ -3897,7 +3910,6 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
struct consume_ctx ctx = {.consume_cb =
rk->rk_conf.consume_cb,
.opaque = rk->rk_conf.opaque};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
}
break;
Expand Down
11 changes: 10 additions & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,16 @@ static rd_kafka_buf_t *rd_kafka_waitresp_find(rd_kafka_broker_t *rkb,
rd_avg_add(&rkb->rkb_avg_rtt, rkbuf->rkbuf_ts_sent);
rd_avg_add(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt,
rkbuf->rkbuf_ts_sent);

if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Fetch) {
rd_avg_add(&rkb->rkb_telemetry.rd_avg_current
.rkb_avg_fetch_latency,
rkbuf->rkbuf_ts_sent);
} else if (rkbuf->rkbuf_reqhdr.ApiKey ==
RD_KAFKAP_OffsetCommit) {
rd_avg_add(&rkb->rkb_telemetry.rd_avg_current
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this call according to comment below about rk_avg_commit_latency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

.rkb_avg_commit_latency,
rkbuf->rkbuf_ts_sent);
}
if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING &&
rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 1)
rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);
Expand Down
21 changes: 17 additions & 4 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,30 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */
rd_avg_t rkb_avg_rtt; /* Current RTT avg */
rd_avg_t rkb_avg_throttle; /* Current throttle avg */
rd_avg_t
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
rkb_avg_outbuf_latency; /**< Current latency
* between buf_enq0
* and writing to socket
*/
rd_avg_t rkb_avg_rebalance_latency; /* Current rebalance
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebalance latency and commit latency need to be moved to rd_kafka_s as they're not per broker. Even for the rkb_avg_commit_latency we don't need an average per broker as the coordinator is only one broker at a time for this client's consumer group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

latency avg */
rd_avg_t rkb_avg_fetch_latency; /* Current fetch latency
avg */
rd_avg_t rkb_avg_commit_latency; /* Current commit
latency avg */
} rd_avg_current;
struct {
rd_avg_t rkb_avg_rtt; /**< Rolled over RTT avg */
rd_avg_t
rkb_avg_throttle; /**< Rolled over throttle avg */
rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf
* latency avg */
rd_avg_t
rkb_avg_rebalance_latency; /* Rolled over rebalance
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same: move rebalance latency and commit to rd_kafka_s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

latency avg */
rd_avg_t rkb_avg_fetch_latency; /* Rolled over fetch
latency avg */
rd_avg_t rkb_avg_commit_latency; /* Rolled over commit
latency avg */
} rd_avg_rollover;
} rkb_telemetry;

Expand Down
15 changes: 14 additions & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,19 @@ static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) {
void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) {
if ((int)rkcg->rkcg_join_state == join_state)
return;

switch (join_state) {
case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
case RD_KAFKA_CGRP_JOIN_STATE_INIT:
rd_avg_add(&rkcg->rkcg_curr_coord->rkb_telemetry.rd_avg_current
.rkb_avg_rebalance_latency,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avg is in rk now

rd_clock() - rkcg->rkcg_ts_rebalance_start);
break;
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is not reached with 848 group so this switch block needs to be changed a bit: when current state is RD_KAFKA_CGRP_JOIN_STATE_INIT or RD_KAFKA_CGRP_JOIN_STATE_STEADY, and new state is different because of previous condition, it sets the rebalance start time; when new state is RD_KAFKA_CGRP_JOIN_STATE_STEADY it calculates the rebalance time and adds it to the avg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

rkcg->rkcg_ts_rebalance_start = rd_clock();
break;
default:
break;
}
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
"Group \"%.*s\" changed join state %s -> %s "
"(state %s)",
Expand Down Expand Up @@ -2712,6 +2724,7 @@ static rd_kafka_op_res_t rd_kafka_cgrp_consumer_handle_next_assignment(
? "cleared"
: "not cleared"));
}
rkcg->rkcg_ts_rebalance_start = rd_clock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary with previous changes, it will handle both types of consumer groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hasn't been removed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still to do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, means that I will make the change i just have not raised the Pr for that commit so left unresolved. These will be reflected with the latest commit.

rd_kafka_cgrp_handle_assignment(rkcg,
rkcg->rkcg_target_assignment);
}
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ typedef struct rd_kafka_cgrp_s {
* assignment */
} rkcg_c;

rd_ts_t rkcg_ts_rebalance_start;

} rd_kafka_cgrp_t;


Expand Down
11 changes: 11 additions & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,23 @@ struct rd_kafka_s {
int *matched_metrics;
size_t matched_metrics_cnt;

rd_ts_t ts_fetch_last;
rd_ts_t ts_fetch_cb_last;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

struct {
rd_ts_t ts_last; /**< Timestamp of last push */
rd_ts_t ts_start; /**< Timestamp from when collection
* started */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's need to add rebalance_latency_total to keep previous value necessary to calculate delta temporality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

} rk_historic_c;

struct {
rd_avg_t rk_avg_poll_idle_ratio;
} rk_avg_current;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we called them rd_avg_current and rd_avg_rollover on the broker as it's a collection of rd_avg_t, use same names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense!


struct {
rd_avg_t rk_avg_poll_idle_ratio;
} rk_avg_rollover;

} rk_telemetry;

/* Test mocks */
Expand Down
1 change: 0 additions & 1 deletion src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#ifndef _RDKAFKA_OP_H_
#define _RDKAFKA_OP_H_


emasab marked this conversation as resolved.
Show resolved Hide resolved
#include "rdkafka_msg.h"
#include "rdkafka_timer.h"
#include "rdkafka_admin.h"
Expand Down
40 changes: 40 additions & 0 deletions src/rdkafka_telemetry_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ bool unit_test_telemetry(rd_kafka_telemetry_producer_metric_name_t metric_name,
(rd_uclock() - 1000 * 1000) * 1000;
rk->rk_telemetry.rk_historic_c.ts_last =
(rd_uclock() - 1000 * 1000) * 1000;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to unit test calculation -> encode -> decode for other metrics too not only for connects so you can pass a set_metric_value that takes rk and rkb that adds to the avgs. That function is called in place of rkb->rkb_c.connects.val = 1;
we can make that calculated values need to be equal to 1 or you can pass expected value to this function.

Add these tests for new metrics only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a discussion.

rd_avg_init(&rk->rk_telemetry.rk_avg_current.rk_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rk->rk_telemetry.rk_avg_rollover.rk_avg_poll_idle_ratio,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);

rd_strlcpy(rk->rk_name, "unittest", sizeof(rk->rk_name));
clear_unit_test_data();

Expand All @@ -483,12 +489,30 @@ bool unit_test_telemetry(rd_kafka_telemetry_producer_metric_name_t metric_name,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_current.rkb_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);

rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rebalance_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);
rd_avg_init(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_commit_latency,
RD_AVG_GAUGE, 0, 500 * 1000, 2, rd_true);

TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link);
rd_buf_t *rbuf = rd_kafka_telemetry_encode_metrics(rk);
void *metrics_payload = rbuf->rbuf_wpos->seg_p;
Expand Down Expand Up @@ -527,6 +551,22 @@ bool unit_test_telemetry(rd_kafka_telemetry_producer_metric_name_t metric_name,
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency);
rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle);

rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_rebalance_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rebalance_latency);

rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_fetch_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_fetch_latency);

rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_current.rkb_avg_commit_latency);
rd_avg_destroy(
&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_commit_latency);

rd_free(rkb);
rwlock_destroy(&rk->rk_lock);
rd_free(rk);
Expand Down
Loading