Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal primary metrics endpoint #5165

Merged
merged 11 commits into from
Jul 5, 2022
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ v_cc_library(
partition_leaders_table.cc
topics_frontend.cc
controller_backend.cc
controller_probe.cc
controller.cc
partition.cc
partition_probe.cc
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ controller::controller(
, _data_policy_manager(data_policy_table)
, _raft_manager(raft_manager)
, _feature_table(feature_table)
, _cloud_storage_api(cloud_storage_api) {}
, _cloud_storage_api(cloud_storage_api)
, _probe(*this) {}

ss::future<> controller::wire_up() {
return _as.start()
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "cluster/controller_probe.h"
#include "cluster/controller_stm.h"
#include "cluster/fwd.h"
#include "cluster/scheduling/leader_balancer.h"
Expand Down Expand Up @@ -114,6 +115,9 @@ class controller {
ss::future<> shutdown_input();
ss::future<> stop();

private:
friend controller_probe;

private:
config_manager::preload_result _config_preload;

Expand Down Expand Up @@ -157,6 +161,7 @@ class controller {
std::unique_ptr<leader_balancer> _leader_balancer;
consensus_ptr _raft0;
ss::sharded<cloud_storage::remote>& _cloud_storage_api;
controller_probe _probe;
};

} // namespace cluster
116 changes: 116 additions & 0 deletions src/v/cluster/controller_probe.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "cluster/controller_probe.h"

#include "cluster/controller.h"
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/metrics.h"

#include <seastar/core/metrics.hh>

#include <absl/container/flat_hash_set.h>

namespace cluster {

controller_probe::controller_probe(controller& c) noexcept
: _controller(c) {
_controller._raft_manager.local().register_leadership_notification(
[this](
raft::group_id group,
model::term_id /*term*/,
std::optional<model::node_id> leader_id) {
// We are only interested in notifications regarding the controller
// group.
if (_controller._raft0->group() != group) {
return;
}

if (leader_id != _controller.self()) {
_public_metrics.reset();
} else {
setup_metrics();
}
});
}

void controller_probe::setup_metrics() {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_public_metrics()) {
return;
}

_public_metrics = std::make_unique<ss::metrics::metric_groups>(
ssx::metrics::public_metrics_handle);
_public_metrics->add_group(
prometheus_sanitize::metrics_name("cluster"),
{
sm::make_gauge(
"brokers",
[this] {
const auto& members_table
= _controller.get_members_table().local();
return members_table.all_brokers_count();
},
sm::description("Number of configured brokers in the cluster"))
.aggregate({sm::shard_label}),
sm::make_gauge(
"topics",
[this] {
const auto& topic_table = _controller.get_topics_state().local();
return topic_table.all_topics_count();
},
sm::description("Number of topics in the cluster"))
.aggregate({sm::shard_label}),
sm::make_gauge(
"partitions",
[this] {
const auto& leaders_table
= _controller._partition_leaders.local();

auto partitions_count = 0;
leaders_table.for_each_leader(
[&partitions_count](auto&&...) { ++partitions_count; });

return partitions_count;
},
sm::description(
"Number of partitions in the cluster (replicas not included)"))
.aggregate({sm::shard_label}),
sm::make_gauge(
"unavailable_partitions",
[this] {
const auto& leaders_table
= _controller._partition_leaders.local();
auto unavailable_partitions_count = 0;

leaders_table.for_each_leader([&unavailable_partitions_count](
const auto& /*tp_ns*/,
auto /*pid*/,
auto leader,
auto /*term*/) {
if (!leader.has_value()) {
++unavailable_partitions_count;
}
});

return unavailable_partitions_count;
},
sm::description(
"Number of partitions that lack quorum among replicants"))
.aggregate({sm::shard_label}),
});
}

} // namespace cluster
32 changes: 32 additions & 0 deletions src/v/cluster/controller_probe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "cluster/fwd.h"
#include "seastarx.h"

#include <seastar/core/metrics_registration.hh>

namespace cluster {

class controller_probe {
public:
explicit controller_probe(cluster::controller&) noexcept;

void setup_metrics();

private:
cluster::controller& _controller;
std::unique_ptr<ss::metrics::metric_groups> _public_metrics;
};

} // namespace cluster
8 changes: 8 additions & 0 deletions src/v/cluster/members_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ std::vector<broker_ptr> members_table::all_brokers() const {

return brokers;
}

size_t members_table::all_brokers_count() const {
return std::count_if(_brokers.begin(), _brokers.end(), [](auto entry) {
return entry.second->get_membership_state()
!= model::membership_state::removed;
});
}

std::vector<model::node_id> members_table::all_broker_ids() const {
std::vector<model::node_id> ids;
ids.reserve(_brokers.size());
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/members_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class members_table {

std::vector<broker_ptr> all_brokers() const;

size_t all_brokers_count() const;

std::vector<model::node_id> all_broker_ids() const;

/// Returns single broker if exists in cache
Expand Down
95 changes: 94 additions & 1 deletion src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@
#include "config/configuration.h"
#include "model/metadata.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/metrics.h"

#include <seastar/core/metrics.hh>

namespace cluster {

replicated_partition_probe::replicated_partition_probe(
const partition& p) noexcept
: _partition(p) {}
: _partition(p)
, _public_metrics(ssx::metrics::public_metrics_handle) {}

void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
setup_internal_metrics(ntp);
setup_public_metrics(ntp);
}

void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_metrics()) {
Expand Down Expand Up @@ -128,6 +135,92 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
.aggregate(aggregate_labels),
});
}

void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_public_metrics()) {
return;
}

auto request_label = sm::label("request");
auto ns_label = sm::label("namespace");
auto topic_label = sm::label("topic");
auto partition_label = sm::label("partition");

const std::vector<sm::label_instance> labels = {
ns_label(ntp.ns()),
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition()),
};

_public_metrics.add_group(
prometheus_sanitize::metrics_name("kafka"),
{
// Partition Level Metrics
sm::make_gauge(
"max_offset",
[this] {
auto log_offset = _partition.committed_offset();
auto translator = _partition.get_offset_translator_state();

try {
return translator->from_log_offset(log_offset);
} catch (std::runtime_error& e) {
// Offset translation will throw if nothing was committed
// to the partition or if the offset is outside the
// translation range for any other reason.
return model::offset(-1);
}
},
sm::description(
"Latest committed offset for the partition (i.e. the offset of the "
"last message safely persisted on most replicas)"),
labels)
.aggregate({sm::shard_label}),
sm::make_gauge(
"under_replicated_replicas",
[this] {
auto metrics = _partition._raft->get_follower_metrics();
return std::count_if(
metrics.cbegin(),
metrics.cend(),
[](const raft::follower_metrics& fm) {
return fm.under_replicated;
});
},
sm::description("Number of under replicated replicas (i.e. replicas "
"that are live, but not at the latest offest)"),
labels)
.aggregate({sm::shard_label}),
// Topic Level Metrics
sm::make_total_bytes(
"request_bytes_total",
[this] { return _bytes_produced; },
sm::description("Total number of bytes produced per topic"),
{request_label("produce"),
ns_label(ntp.ns()),
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition())})
.aggregate({sm::shard_label, partition_label}),
sm::make_total_bytes(
"request_bytes_total",
[this] { return _bytes_fetched; },
sm::description("Total number of bytes consumed per topic"),
{request_label("consume"),
ns_label(ntp.ns()),
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition())})
.aggregate({sm::shard_label, partition_label}),
sm::make_gauge(
"replicas",
[this] { return _partition._raft->get_follower_count(); },
sm::description("Number of replicas per topic"),
labels)
.aggregate({sm::shard_label, partition_label}),
});
}

partition_probe make_materialized_partition_probe() {
// TODO: implement partition probe for materialized partitions
class impl : public partition_probe::impl {
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@ class replicated_partition_probe : public partition_probe::impl {
void add_bytes_fetched(uint64_t cnt) final { _bytes_fetched += cnt; }
void add_bytes_produced(uint64_t cnt) final { _bytes_produced += cnt; }

private:
void setup_public_metrics(const model::ntp&);
void setup_internal_metrics(const model::ntp&);

private:
const partition& _partition;
uint64_t _records_produced{0};
uint64_t _records_fetched{0};
uint64_t _bytes_produced{0};
uint64_t _bytes_fetched{0};
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics;
};

partition_probe make_materialized_partition_probe();
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/tests/cluster_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ class cluster_test_fixture {
: _sgroups(create_scheduling_groups())
, _group_deleter([this] { _sgroups.destroy_groups().get(); })
, _base_dir("cluster_test." + random_generators::gen_alphanum_string(6)) {
// Disable all metrics to guard against double_registration errors
// thrown by seastar. These are simulated nodes which use the same
// internal metrics implementation, so the usual metrics registration
// process won't work.
set_configuration("disable_metrics", true);
set_configuration("disable_public_metrics", true);
}

virtual ~cluster_test_fixture() {
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ std::vector<model::topic_namespace> topic_table::all_topics() const {
[](const topic_metadata& tp) { return tp.get_configuration().tp_ns; });
}

size_t topic_table::all_topics_count() const { return _topics.size(); }

std::optional<topic_metadata>
topic_table::get_topic_metadata(model::topic_namespace_view tp) const {
if (auto it = _topics.find(tp); it != _topics.end()) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class topic_table {
/// Returns list of all topics that exists in the cluster.
std::vector<model::topic_namespace> all_topics() const;

// Returns the number of topics that exist in the cluster.
size_t all_topics_count() const;

///\brief Returns metadata of single topic.
///
/// If topic does not exists it returns an empty optional
Expand Down
Loading