Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics request bytes #4982

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ partition::partition(
ss::sharded<cloud_storage::cache>& cloud_storage_cache)
: _raft(r)
, _probe(std::make_unique<replicated_partition_probe>(*this))
, _probe_v2()
, _tx_gateway_frontend(tx_gateway_frontend)
, _is_tx_enabled(config::shard_local_cfg().enable_transactions.value())
, _is_idempotence_enabled(
Expand Down Expand Up @@ -255,6 +256,7 @@ ss::future<> partition::start() {
auto ntp = _raft->ntp();

_probe.setup_metrics(ntp);
_probe_v2.setup_metrics(ntp);

auto f = _raft->start();

Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/archival_metadata_stm.h"
#include "cluster/id_allocator_stm.h"
#include "cluster/partition_probe.h"
#include "cluster/partition_probe_v2.h"
#include "cluster/rm_stm.h"
#include "cluster/tm_stm.h"
#include "cluster/types.h"
Expand Down Expand Up @@ -168,6 +169,7 @@ class partition {
}

partition_probe& probe() { return _probe; }
partition_probe_v2& probe_v2() { return _probe_v2; }

model::revision_id get_revision_id() const {
return _raft->config().revision_id();
Expand Down Expand Up @@ -277,6 +279,7 @@ class partition {
private:
friend partition_manager;
friend replicated_partition_probe;
friend partition_probe_v2;

consensus_ptr raft() { return _raft; }

Expand All @@ -289,6 +292,7 @@ class partition {
ss::shared_ptr<archival_metadata_stm> _archival_meta_stm;
ss::abort_source _as;
partition_probe _probe;
partition_probe_v2 _probe_v2;
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
bool _is_tx_enabled{false};
bool _is_idempotence_enabled{false};
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ static ss::future<read_result> read_from_partition(
data = std::make_unique<iobuf>(std::move(result.data));
part.probe().add_records_fetched(result.record_count);
part.probe().add_bytes_fetched(data->size_bytes());
part.probe_v2().add_fetch_bytes_cluster_lvl(data->size_bytes());
if (result.record_count > 0) {
// Reader should live at least until this point to hold on to the
// segment locks so that prefix truncation doesn't happen.
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ static partition_produce_stages partition_append(
p.error_code = error_code::none;
partition->probe().add_records_produced(num_records);
partition->probe().add_bytes_produced(num_bytes);
partition->probe_v2().add_produce_bytes_cluster_lvl(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requirements for this new metric are a combination of the old metrics generated from partition_probe::add_bytes_produced() and partition_probe::add_bytes_fetched(). But those metrics were at the cluster level. I still need to figure out where to put the topic level metrics.

num_bytes);
} else {
p.error_code = map_produce_error_code(r.error());
}
Expand Down
4 changes: 4 additions & 0 deletions src/v/kafka/server/materialized_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "cluster/partition_probe.h"
#include "cluster/partition_probe_v2.h"
#include "coproc/partition.h"
#include "kafka/server/partition_proxy.h"
#include "kafka/types.h"
Expand All @@ -25,6 +26,7 @@ class materialized_partition final : public kafka::partition_proxy::impl {
explicit materialized_partition(
ss::lw_shared_ptr<coproc::partition> p) noexcept
: _probe(cluster::make_materialized_partition_probe())
, _probe_v2()
, _partition(p) {}

const model::ntp& ntp() const final { return _partition->ntp(); }
Expand Down Expand Up @@ -83,6 +85,7 @@ class materialized_partition final : public kafka::partition_proxy::impl {
}

cluster::partition_probe& probe() final { return _probe; }
cluster::partition_probe_v2& probe_v2() final { return _probe_v2; }

ss::future<bool> is_fetch_offset_valid(
model::offset fetch_offset, model::timeout_clock::time_point) final {
Expand All @@ -95,6 +98,7 @@ class materialized_partition final : public kafka::partition_proxy::impl {
}

cluster::partition_probe _probe;
cluster::partition_probe_v2 _probe_v2;
ss::lw_shared_ptr<coproc::partition> _partition;
};

Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/partition_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class partition_proxy {
is_fetch_offset_valid(model::offset, model::timeout_clock::time_point)
= 0;
virtual cluster::partition_probe& probe() = 0;
virtual cluster::partition_probe_v2& probe_v2() = 0;
virtual ~impl() noexcept = default;
};

Expand Down Expand Up @@ -96,6 +97,7 @@ class partition_proxy {
}

cluster::partition_probe& probe() { return _impl->probe(); }
cluster::partition_probe_v2& probe_v2() { return _impl->probe_v2(); }

kafka::leader_epoch leader_epoch() const { return _impl->leader_epoch(); }

Expand Down
4 changes: 4 additions & 0 deletions src/v/kafka/server/replicated_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cluster/partition.h"
#include "cluster/partition_probe.h"
#include "cluster/partition_probe_v2.h"
#include "kafka/server/partition_proxy.h"
#include "kafka/types.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -86,6 +87,9 @@ class replicated_partition final : public kafka::partition_proxy::impl {
ss::lw_shared_ptr<const storage::offset_translator_state>) final;

cluster::partition_probe& probe() final { return _partition->probe(); }
cluster::partition_probe_v2& probe_v2() final {
return _partition->probe_v2();
}

std::optional<model::offset>
get_leader_epoch_last_offset(kafka::leader_epoch) const final;
Expand Down