From 7262ff8c755ae14022c110783362f183f237def1 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 26 Apr 2024 11:22:53 +0100 Subject: [PATCH] Revert "kafka: remove existing fetch latency measurement" This reverts commit dd4676aa3a41e44e23d9b73c79670f85a1fe0d5d. Will be replaced with 23.3 commit --- src/v/kafka/server/handlers/fetch.cc | 12 ++++++++++-- src/v/kafka/server/handlers/fetch.h | 10 ++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index b9925dd19689..3b8255aafad5 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -423,7 +423,8 @@ read_result::memory_units_t reserve_memory_units( static void fill_fetch_responses( op_context& octx, std::vector results, - const std::vector& responses) { + const std::vector& responses, + op_context::latency_point start_time) { auto range = boost::irange(0, results.size()); if (unlikely(results.size() != responses.size())) { // soft assert & recovery attempt @@ -519,6 +520,10 @@ static void fill_fetch_responses( } resp_it->set(std::move(resp)); + std::chrono::microseconds fetch_latency + = std::chrono::duration_cast( + op_context::latency_clock::now() - start_time); + octx.rctx.probe().record_fetch_latency(fetch_latency); } } @@ -1152,7 +1157,10 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { }); fill_fetch_responses( - octx, std::move(results.read_results), fetch.responses); + octx, + std::move(results.read_results), + fetch.responses, + fetch.start_time); octx.rctx.probe().record_fetch_latency( results.first_run_latency_result); diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 285886782541..dda2aef56ed1 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -331,6 +331,9 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { + explicit shard_fetch(op_context::latency_point start_time) + : start_time{start_time} {} + void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph) { requests.push_back(std::move(config)); @@ -346,6 +349,7 @@ struct shard_fetch { ss::shard_id shard; std::vector requests; std::vector responses; + op_context::latency_point start_time; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { fmt::print(o, "{}", sf.requests); @@ -354,8 +358,10 @@ struct shard_fetch { }; struct fetch_plan { - explicit fetch_plan(size_t shards) - : fetches_per_shard(shards, shard_fetch()) { + explicit fetch_plan( + size_t shards, + op_context::latency_point start_time = op_context::latency_clock::now()) + : fetches_per_shard(shards, shard_fetch(start_time)) { for (size_t i = 0; i < fetches_per_shard.size(); i++) { fetches_per_shard[i].shard = i; }