diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index bda98b0e7a5cb..18991ad10066f 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -57,6 +57,7 @@ v_cc_library( partition_leaders_table.cc topics_frontend.cc controller_backend.cc + controller_probe.cc controller.cc partition.cc partition_probe.cc diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 1786d2f38192c..346c353f00a88 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -71,7 +71,8 @@ controller::controller( , _data_policy_manager(data_policy_table) , _raft_manager(raft_manager) , _feature_table(feature_table) - , _cloud_storage_api(cloud_storage_api) {} + , _cloud_storage_api(cloud_storage_api) + , _probe(*this) {} ss::future<> controller::wire_up() { return _as.start() diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index 6ae1e0ad33f73..8a285cd5c6b50 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -11,6 +11,7 @@ #pragma once +#include "cluster/controller_probe.h" #include "cluster/controller_stm.h" #include "cluster/fwd.h" #include "cluster/scheduling/leader_balancer.h" @@ -114,6 +115,9 @@ class controller { ss::future<> shutdown_input(); ss::future<> stop(); +private: + friend controller_probe; + private: config_manager::preload_result _config_preload; @@ -157,6 +161,7 @@ class controller { std::unique_ptr _leader_balancer; consensus_ptr _raft0; ss::sharded& _cloud_storage_api; + controller_probe _probe; }; } // namespace cluster diff --git a/src/v/cluster/controller_probe.cc b/src/v/cluster/controller_probe.cc new file mode 100644 index 0000000000000..833d76e47d22c --- /dev/null +++ b/src/v/cluster/controller_probe.cc @@ -0,0 +1,116 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster/controller_probe.h" + +#include "cluster/controller.h" +#include "cluster/members_table.h" +#include "cluster/partition_leaders_table.h" +#include "prometheus/prometheus_sanitize.h" +#include "ssx/metrics.h" + +#include + +#include + +namespace cluster { + +controller_probe::controller_probe(controller& c) noexcept + : _controller(c) { + _controller._raft_manager.local().register_leadership_notification( + [this]( + raft::group_id group, + model::term_id /*term*/, + std::optional leader_id) { + // We are only interested in notifications regarding the controller + // group. + if (_controller._raft0->group() != group) { + return; + } + + if (leader_id != _controller.self()) { + _public_metrics.reset(); + } else { + setup_metrics(); + } + }); +} + +void controller_probe::setup_metrics() { + namespace sm = ss::metrics; + + if (config::shard_local_cfg().disable_public_metrics()) { + return; + } + + _public_metrics = std::make_unique( + ssx::metrics::public_metrics_handle); + _public_metrics->add_group( + prometheus_sanitize::metrics_name("cluster"), + { + sm::make_gauge( + "brokers", + [this] { + const auto& members_table + = _controller.get_members_table().local(); + return members_table.all_brokers_count(); + }, + sm::description("Number of configured brokers in the cluster")) + .aggregate({sm::shard_label}), + sm::make_gauge( + "topics", + [this] { + const auto& topic_table = _controller.get_topics_state().local(); + return topic_table.all_topics_count(); + }, + sm::description("Number of topics in the cluster")) + .aggregate({sm::shard_label}), + sm::make_gauge( + "partitions", + [this] { + const auto& leaders_table + = _controller._partition_leaders.local(); + + auto partitions_count = 0; + leaders_table.for_each_leader( + [&partitions_count](auto&&...) { ++partitions_count; }); + + return partitions_count; + }, + sm::description( + "Number of partitions in the cluster (replicas not included)")) + .aggregate({sm::shard_label}), + sm::make_gauge( + "unavailable_partitions", + [this] { + const auto& leaders_table + = _controller._partition_leaders.local(); + auto unavailable_partitions_count = 0; + + leaders_table.for_each_leader([&unavailable_partitions_count]( + const auto& /*tp_ns*/, + auto /*pid*/, + auto leader, + auto /*term*/) { + if (!leader.has_value()) { + ++unavailable_partitions_count; + } + }); + + return unavailable_partitions_count; + }, + sm::description( + "Number of partitions that lack quorum among replicants")) + .aggregate({sm::shard_label}), + }); +} + +} // namespace cluster diff --git a/src/v/cluster/controller_probe.h b/src/v/cluster/controller_probe.h new file mode 100644 index 0000000000000..781f2de1ff68c --- /dev/null +++ b/src/v/cluster/controller_probe.h @@ -0,0 +1,32 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/fwd.h" +#include "seastarx.h" + +#include + +namespace cluster { + +class controller_probe { +public: + explicit controller_probe(cluster::controller&) noexcept; + + void setup_metrics(); + +private: + cluster::controller& _controller; + std::unique_ptr _public_metrics; +}; + +} // namespace cluster diff --git a/src/v/cluster/members_table.cc b/src/v/cluster/members_table.cc index dcac0cea9dab4..042af4f363bf7 100644 --- a/src/v/cluster/members_table.cc +++ b/src/v/cluster/members_table.cc @@ -32,6 +32,14 @@ std::vector members_table::all_brokers() const { return brokers; } + +size_t members_table::all_brokers_count() const { + return std::count_if(_brokers.begin(), _brokers.end(), [](auto entry) { + return entry.second->get_membership_state() + != model::membership_state::removed; + }); +} + std::vector members_table::all_broker_ids() const { std::vector ids; ids.reserve(_brokers.size()); diff --git a/src/v/cluster/members_table.h b/src/v/cluster/members_table.h index 57de0506afacb..3e751b5294423 100644 --- a/src/v/cluster/members_table.h +++ b/src/v/cluster/members_table.h @@ -29,6 +29,8 @@ class members_table { std::vector all_brokers() const; + size_t all_brokers_count() const; + std::vector all_broker_ids() const; /// Returns single broker if exists in cache diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 1b92f0e57f01f..949409d0fb20d 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -13,6 +13,7 @@ #include "config/configuration.h" #include "model/metadata.h" #include "prometheus/prometheus_sanitize.h" +#include "ssx/metrics.h" #include @@ -20,9 +21,15 @@ namespace cluster { replicated_partition_probe::replicated_partition_probe( const partition& p) noexcept - : _partition(p) {} + : _partition(p) + , _public_metrics(ssx::metrics::public_metrics_handle) {} void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { + setup_internal_metrics(ntp); + setup_public_metrics(ntp); +} + +void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { namespace sm = ss::metrics; if (config::shard_local_cfg().disable_metrics()) { @@ -128,6 +135,92 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { .aggregate(aggregate_labels), }); } + +void replicated_partition_probe::setup_public_metrics(const model::ntp& ntp) { + namespace sm = ss::metrics; + + if (config::shard_local_cfg().disable_public_metrics()) { + return; + } + + auto request_label = sm::label("request"); + auto ns_label = sm::label("namespace"); + auto topic_label = sm::label("topic"); + auto partition_label = sm::label("partition"); + + const std::vector labels = { + ns_label(ntp.ns()), + topic_label(ntp.tp.topic()), + partition_label(ntp.tp.partition()), + }; + + _public_metrics.add_group( + prometheus_sanitize::metrics_name("kafka"), + { + // Partition Level Metrics + sm::make_gauge( + "max_offset", + [this] { + auto log_offset = _partition.committed_offset(); + auto translator = _partition.get_offset_translator_state(); + + try { + return translator->from_log_offset(log_offset); + } catch (std::runtime_error& e) { + // Offset translation will throw if nothing was committed + // to the partition or if the offset is outside the + // translation range for any other reason. + return model::offset(-1); + } + }, + sm::description( + "Latest committed offset for the partition (i.e. the offset of the " + "last message safely persisted on most replicas)"), + labels) + .aggregate({sm::shard_label}), + sm::make_gauge( + "under_replicated_replicas", + [this] { + auto metrics = _partition._raft->get_follower_metrics(); + return std::count_if( + metrics.cbegin(), + metrics.cend(), + [](const raft::follower_metrics& fm) { + return fm.under_replicated; + }); + }, + sm::description("Number of under replicated replicas (i.e. replicas " + "that are live, but not at the latest offest)"), + labels) + .aggregate({sm::shard_label}), + // Topic Level Metrics + sm::make_total_bytes( + "request_bytes_total", + [this] { return _bytes_produced; }, + sm::description("Total number of bytes produced per topic"), + {request_label("produce"), + ns_label(ntp.ns()), + topic_label(ntp.tp.topic()), + partition_label(ntp.tp.partition())}) + .aggregate({sm::shard_label, partition_label}), + sm::make_total_bytes( + "request_bytes_total", + [this] { return _bytes_fetched; }, + sm::description("Total number of bytes consumed per topic"), + {request_label("consume"), + ns_label(ntp.ns()), + topic_label(ntp.tp.topic()), + partition_label(ntp.tp.partition())}) + .aggregate({sm::shard_label, partition_label}), + sm::make_gauge( + "replicas", + [this] { return _partition._raft->get_follower_count(); }, + sm::description("Number of replicas per topic"), + labels) + .aggregate({sm::shard_label, partition_label}), + }); +} + partition_probe make_materialized_partition_probe() { // TODO: implement partition probe for materialized partitions class impl : public partition_probe::impl { diff --git a/src/v/cluster/partition_probe.h b/src/v/cluster/partition_probe.h index 8f4c316700d0b..dbe2284f93e60 100644 --- a/src/v/cluster/partition_probe.h +++ b/src/v/cluster/partition_probe.h @@ -68,6 +68,10 @@ class replicated_partition_probe : public partition_probe::impl { void add_bytes_fetched(uint64_t cnt) final { _bytes_fetched += cnt; } void add_bytes_produced(uint64_t cnt) final { _bytes_produced += cnt; } +private: + void setup_public_metrics(const model::ntp&); + void setup_internal_metrics(const model::ntp&); + private: const partition& _partition; uint64_t _records_produced{0}; @@ -75,6 +79,7 @@ class replicated_partition_probe : public partition_probe::impl { uint64_t _bytes_produced{0}; uint64_t _bytes_fetched{0}; ss::metrics::metric_groups _metrics; + ss::metrics::metric_groups _public_metrics; }; partition_probe make_materialized_partition_probe(); diff --git a/src/v/cluster/tests/cluster_test_fixture.h b/src/v/cluster/tests/cluster_test_fixture.h index 77ad3f1c43b58..7c0fd22a71747 100644 --- a/src/v/cluster/tests/cluster_test_fixture.h +++ b/src/v/cluster/tests/cluster_test_fixture.h @@ -53,7 +53,12 @@ class cluster_test_fixture { : _sgroups(create_scheduling_groups()) , _group_deleter([this] { _sgroups.destroy_groups().get(); }) , _base_dir("cluster_test." + random_generators::gen_alphanum_string(6)) { + // Disable all metrics to guard against double_registration errors + // thrown by seastar. These are simulated nodes which use the same + // internal metrics implementation, so the usual metrics registration + // process won't work. set_configuration("disable_metrics", true); + set_configuration("disable_public_metrics", true); } virtual ~cluster_test_fixture() { diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 0811b118f05b1..68beeac7e0797 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -628,6 +628,8 @@ std::vector topic_table::all_topics() const { [](const topic_metadata& tp) { return tp.get_configuration().tp_ns; }); } +size_t topic_table::all_topics_count() const { return _topics.size(); } + std::optional topic_table::get_topic_metadata(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 94d84c56e9bad..8daf7e294e4f3 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -125,6 +125,9 @@ class topic_table { /// Returns list of all topics that exists in the cluster. std::vector all_topics() const; + // Returns the number of topics that exist in the cluster. + size_t all_topics_count() const; + ///\brief Returns metadata of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 1bd437885a7fc..ce43fdf0aebee 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -238,7 +238,15 @@ configuration::configuration() , disable_metrics( *this, "disable_metrics", - "Disable registering metrics", + "Disable registering metrics exposed on the internal metrics endpoint " + "(/metrics)", + base_property::metadata{}, + false) + , disable_public_metrics( + *this, + "disable_public_metrics", + "Disable registering metrics exposed on the public metrics endpoint " + "(/public_metrics)", base_property::metadata{}, false) , aggregate_metrics( diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index f2caf529babda..83bd03a0b05d0 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -82,6 +82,7 @@ struct configuration final : public config_store { bounded_property target_quota_byte_rate; property> cluster_id; property disable_metrics; + property disable_public_metrics; property aggregate_metrics; property group_min_session_timeout_ms; property group_max_session_timeout_ms; diff --git a/src/v/kafka/latency_probe.h b/src/v/kafka/latency_probe.h index fc7902abd3551..0e08e6fc606b5 100644 --- a/src/v/kafka/latency_probe.h +++ b/src/v/kafka/latency_probe.h @@ -13,6 +13,7 @@ #include "config/configuration.h" #include "prometheus/prometheus_sanitize.h" +#include "ssx/metrics.h" #include "utils/hdr_hist.h" #include @@ -45,6 +46,28 @@ class latency_probe { labels, [this] { return _produce_latency.seastar_histogram_logform(); }) .aggregate(aggregate_labels)}); + + _public_metrics.add_group( + prometheus_sanitize::metrics_name("kafka"), + { + sm::make_histogram( + "request_latency_seconds", + sm::description("Internal latency of kafka produce requests"), + {sm::label("request")("produce")}, + [this] { + return ssx::metrics::report_default_histogram( + _produce_latency); + }) + .aggregate({sm::shard_label}), + sm::make_histogram( + "request_latency_seconds", + sm::description("Internal latency of kafka consume requests"), + {sm::label("request")("consume")}, + [this] { + return ssx::metrics::report_default_histogram(_fetch_latency); + }) + .aggregate({sm::shard_label}), + }); } std::unique_ptr auto_produce_measurement() { @@ -58,6 +81,8 @@ class latency_probe { hdr_hist _produce_latency; hdr_hist _fetch_latency; ss::metrics::metric_groups _metrics; + ss::metrics::metric_groups _public_metrics{ + ssx::metrics::public_metrics_handle}; }; } // namespace kafka diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index 8d1227885a170..f1c1f976b1430 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -11,17 +11,19 @@ #include "config/configuration.h" #include "prometheus/prometheus_sanitize.h" +#include "ssx/metrics.h" +#include "ssx/sformat.h" #include +#include namespace pandaproxy { -probe::probe(ss::httpd::path_description& path_desc) +probe::probe( + ss::httpd::path_description& path_desc, const ss::sstring& group_name) : _request_hist() - , _metrics() { - if (config::shard_local_cfg().disable_metrics()) { - return; - } + , _metrics() + , _public_metrics(ssx::metrics::public_metrics_handle) { namespace sm = ss::metrics; auto operation_label = sm::label("operation"); @@ -30,19 +32,36 @@ probe::probe(ss::httpd::path_description& path_desc) auto aggregate_labels = std::vector{ sm::shard_label, operation_label}; - auto internal_aggregate_labels - = config::shard_local_cfg().aggregate_metrics() - ? aggregate_labels - : std::vector{}; - - _metrics.add_group( - "pandaproxy", - {sm::make_histogram( - "request_latency", - sm::description("Request latency"), - labels, - [this] { return _request_hist.seastar_histogram_logform(); }) - .aggregate(internal_aggregate_labels)}); + + if (!config::shard_local_cfg().disable_metrics()) { + auto internal_aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? aggregate_labels + : std::vector{}; + + _metrics.add_group( + "pandaproxy", + {sm::make_histogram( + "request_latency", + sm::description("Request latency"), + labels, + [this] { return _request_hist.seastar_histogram_logform(); }) + .aggregate(internal_aggregate_labels)}); + } + + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group( + group_name, + {sm::make_histogram( + "request_latency_seconds", + sm::description( + ssx::sformat("Internal latency of request for {}", group_name)), + labels, + [this] { + return ssx::metrics::report_default_histogram(_request_hist); + }) + .aggregate(aggregate_labels)}); + } } } // namespace pandaproxy diff --git a/src/v/pandaproxy/probe.h b/src/v/pandaproxy/probe.h index 198a4036ffc4f..22c2436bc1065 100644 --- a/src/v/pandaproxy/probe.h +++ b/src/v/pandaproxy/probe.h @@ -20,12 +20,14 @@ namespace pandaproxy { class probe { public: - probe(ss::httpd::path_description& path_desc); + probe( + ss::httpd::path_description& path_desc, const ss::sstring& group_name); hdr_hist& hist() { return _request_hist; } private: hdr_hist _request_hist; ss::metrics::metric_groups _metrics; + ss::metrics::metric_groups _public_metrics; }; } // namespace pandaproxy diff --git a/src/v/pandaproxy/rest/proxy.cc b/src/v/pandaproxy/rest/proxy.cc index d277e91d506c6..acdd0cb946b96 100644 --- a/src/v/pandaproxy/rest/proxy.cc +++ b/src/v/pandaproxy/rest/proxy.cc @@ -70,6 +70,7 @@ proxy::proxy( , _ctx{{{}, _mem_sem, {}, smp_sg}, *this} , _server( "pandaproxy", + "rest_proxy", ss::api_registry_builder20(_config.api_doc_dir(), "/v1"), "header", "/definitions", diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index fbca7f75a4cbc..e7d8e9b4c74a8 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -224,7 +224,8 @@ service::service( , _client(client) , _ctx{{{}, _mem_sem, {}, smp_sg}, *this} , _server( - "schema_registry", + "schema_registry", // server_name + "schema_registry", // public_metric_group_name ss::api_registry_builder20(_config.api_doc_dir(), "/v1"), "schema_registry_header", "/schema_registry_definitions", diff --git a/src/v/pandaproxy/server.cc b/src/v/pandaproxy/server.cc index 1154864dba300..0dd564a67ccf1 100644 --- a/src/v/pandaproxy/server.cc +++ b/src/v/pandaproxy/server.cc @@ -69,11 +69,12 @@ struct handler_adaptor : ss::httpd::handler_base { ss::gate& pending_requests, server::context_t& ctx, server::function_handler&& handler, - ss::httpd::path_description& path_desc) + ss::httpd::path_description& path_desc, + const ss::sstring& metrics_group_name) : _pending_requests(pending_requests) , _ctx(ctx) , _handler(std::move(handler)) - , _probe(path_desc) {} + , _probe(path_desc, metrics_group_name) {} ss::future> handle( const ss::sstring&, @@ -116,11 +117,13 @@ struct handler_adaptor : ss::httpd::handler_base { server::server( const ss::sstring& server_name, + const ss::sstring& public_metrics_group_name, ss::api_registry_builder20&& api20, const ss::sstring& header, const ss::sstring& definitions, context_t& ctx) : _server(server_name) + , _public_metrics_group_name(public_metrics_group_name) , _pending_reqs() , _api20(std::move(api20)) , _has_routes(false) @@ -137,7 +140,11 @@ void server::route(server::route_t r) { // NOTE: this pointer will be owned by data member _routes of // ss::httpd:server. seastar didn't use any unique ptr to express that. auto* handler = new handler_adaptor( - _pending_reqs, _ctx, std::move(r.handler), r.path_desc); + _pending_reqs, + _ctx, + std::move(r.handler), + r.path_desc, + _public_metrics_group_name); r.path_desc.set(_server._routes, handler); } diff --git a/src/v/pandaproxy/server.h b/src/v/pandaproxy/server.h index 344d7b9f30325..3790c081a78a1 100644 --- a/src/v/pandaproxy/server.h +++ b/src/v/pandaproxy/server.h @@ -80,6 +80,7 @@ class server { server( const ss::sstring& server_name, + const ss::sstring& public_metrics_group_name, ss::api_registry_builder20&& api20, const ss::sstring& header, const ss::sstring& definitions, @@ -97,6 +98,7 @@ class server { private: ss::httpd::http_server _server; + ss::sstring _public_metrics_group_name; ss::gate _pending_reqs; ss::api_registry_builder20 _api20; bool _has_routes; diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index e9c7c4efa6ddc..877efc476769b 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -3182,6 +3182,10 @@ consensus::get_follower_metrics(model::node_id id) const { it->second); } +size_t consensus::get_follower_count() const { + return is_elected_leader() ? _fstats.size() : 0; +} + ss::future> consensus::timequery(storage::timequery_config cfg) { return _log.timequery(cfg).then( diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 68c94e553c1d1..87f7323d6072c 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -338,6 +338,7 @@ class consensus { std::vector get_follower_metrics() const; result get_follower_metrics(model::node_id) const; + size_t get_follower_count() const; bool has_followers() const { return _fstats.size() > 0; } offset_monitor& visible_offset_monitor() { diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 6917725c5c14e..3864920e4ad73 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -62,6 +62,7 @@ #include "redpanda/request_auth.h" #include "security/scram_algorithm.h" #include "security/scram_authenticator.h" +#include "ssx/metrics.h" #include "vlog.h" #include @@ -269,10 +270,20 @@ get_boolean_query_param(const ss::httpd::request& req, std::string_view name) { } void admin_server::configure_metrics_route() { - ss::prometheus::config metrics_conf; - metrics_conf.metric_help = "redpanda metrics"; - metrics_conf.prefix = "vectorized"; - ss::prometheus::add_prometheus_routes(_server, metrics_conf).get(); + ss::prometheus::add_prometheus_routes( + _server, + {.metric_help = "redpanda metrics", + .prefix = "vectorized", + .handle = ss::metrics::default_handle(), + .route = "/metrics"}) + .get(); + ss::prometheus::add_prometheus_routes( + _server, + {.metric_help = "redpanda metrics", + .prefix = "redpanda", + .handle = ssx::metrics::public_metrics_handle, + .route = "/public_metrics"}) + .get(); } ss::future<> admin_server::configure_listeners() { diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index e05253a7c712c..b5986f36ecaf7 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -170,6 +170,7 @@ class redpanda_thread_fixture { config.get("join_retry_timeout_ms").set_value(100ms); config.get("members_backend_retry_ms").set_value(1000ms); config.get("disable_metrics").set_value(true); + config.get("disable_public_metrics").set_value(true); auto& node_config = config::node(); node_config.get("admin").set_value( diff --git a/src/v/ssx/CMakeLists.txt b/src/v/ssx/CMakeLists.txt index 2591d270c034f..331cc2ae88106 100644 --- a/src/v/ssx/CMakeLists.txt +++ b/src/v/ssx/CMakeLists.txt @@ -3,6 +3,7 @@ v_cc_library( ssx HDRS "future-util.h" + "metrics.h" DEPS Seastar::seastar ) diff --git a/src/v/ssx/metrics.h b/src/v/ssx/metrics.h new file mode 100644 index 0000000000000..3858980c00284 --- /dev/null +++ b/src/v/ssx/metrics.h @@ -0,0 +1,37 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "utils/hdr_hist.h" + +#include + +namespace ssx::metrics { + +// The seastar metrics handle to be used for the '/public_metrics' prometheus +// endpoint. +const auto public_metrics_handle = seastar::metrics::default_handle() + 1; + +// Convert an HDR histogram to a seastar histogram for reporting. +// Entries with values ranging form 250 microseconds to 1 minute are +// grouped in 18 buckets of exponentially increasing size. +inline ss::metrics::histogram report_default_histogram(const hdr_hist& hist) { + static constexpr size_t num_buckets = 18; + static constexpr size_t first_value = 250; + static constexpr double log_base = 2.0; + static constexpr int64_t scale = 1000000; + + return hist.seastar_histogram_logform( + num_buckets, first_value, log_base, scale); +} + +} // namespace ssx::metrics diff --git a/src/v/utils/hdr_hist.cc b/src/v/utils/hdr_hist.cc index 584b7601241a7..fcf01a5a2090e 100644 --- a/src/v/utils/hdr_hist.cc +++ b/src/v/utils/hdr_hist.cc @@ -70,7 +70,11 @@ double hdr_hist::mean() const { return ::hdr_mean(_hist.get()); } size_t hdr_hist::memory_size() const { return ::hdr_get_memory_size(_hist.get()); } -ss::metrics::histogram hdr_hist::seastar_histogram_logform() const { +ss::metrics::histogram hdr_hist::seastar_histogram_logform( + size_t num_buckets, + int64_t first_value, + double log_base, + int64_t scale) const { // logarithmic histogram configuration. this will range from 10 microseconds // through around 6000 seconds with 26 buckets doubling. // @@ -80,15 +84,13 @@ ss::metrics::histogram hdr_hist::seastar_histogram_logform() const { // double but is truncated (see the int64_t casts on log_base below which // is the same as in the hdr C library). this means that if we want // buckets with a log base of 1.5, the histogram becomes linear... - constexpr size_t num_buckets = 26; - constexpr int64_t first_value = 10; - constexpr double log_base = 2.0; ss::metrics::histogram sshist; sshist.buckets.resize(num_buckets); sshist.sample_count = _sample_count; - sshist.sample_sum = static_cast(_sample_sum); + sshist.sample_sum = static_cast(_sample_sum) + / static_cast(scale); // stack allocated; no cleanup needed struct hdr_iter iter; @@ -102,7 +104,8 @@ ss::metrics::histogram hdr_hist::seastar_histogram_logform() const { bucket_idx++) { auto& bucket = sshist.buckets[bucket_idx]; bucket.count = iter.cumulative_count; - bucket.upper_bound = iter.value_iterated_to; + bucket.upper_bound = static_cast(iter.value_iterated_to) + / static_cast(scale); } if (bucket_idx == 0) { @@ -123,7 +126,8 @@ ss::metrics::histogram hdr_hist::seastar_histogram_logform() const { for (; bucket_idx < sshist.buckets.size(); bucket_idx++) { auto& bucket = sshist.buckets[bucket_idx]; bucket.count = iter.cumulative_count; - bucket.upper_bound = iter.value_iterated_to; + bucket.upper_bound = static_cast(iter.value_iterated_to) + / static_cast(scale); iter.value_iterated_to *= static_cast(log->log_base); } diff --git a/src/v/utils/hdr_hist.h b/src/v/utils/hdr_hist.h index ed903fe86b14b..f756318a11f6a 100644 --- a/src/v/utils/hdr_hist.h +++ b/src/v/utils/hdr_hist.h @@ -141,7 +141,11 @@ class hdr_hist { double stddev() const; double mean() const; size_t memory_size() const; - ss::metrics::histogram seastar_histogram_logform() const; + ss::metrics::histogram seastar_histogram_logform( + size_t num_buckets = 26, + int64_t first_value = 10, + double log_base = 2.0, + int64_t scale = 1) const; std::unique_ptr auto_measure(); diff --git a/tests/rptest/services/metrics_check.py b/tests/rptest/services/metrics_check.py index 368e10db87be0..e775f4dd7024c 100644 --- a/tests/rptest/services/metrics_check.py +++ b/tests/rptest/services/metrics_check.py @@ -9,6 +9,8 @@ import re +from rptest.services.redpanda import MetricsEndpoint + class MetricCheckFailed(Exception): def __init__(self, metric, old_value, new_value): @@ -26,7 +28,14 @@ class MetricCheck(object): call `expect` or `evaluate` later to measure how your metrics of interest have changed over that region. """ - def __init__(self, logger, redpanda, node, metrics, labels, reduce=None): + def __init__(self, + logger, + redpanda, + node, + metrics, + labels=None, + reduce=None, + metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS): """ :param redpanda: a RedpandaService :param logger: a Logger @@ -34,6 +43,8 @@ def __init__(self, logger, redpanda, node, metrics, labels, reduce=None): :param metrics: a list of metric names, or a single compiled regex (use re.compile()) :param labels: dict, to filter metrics as we capture and check. :param reduce: reduction function (e.g. sum) if multiple samples match metrics+labels + :param metrics_endpoint: MetricsEndpoint enumeration instance specifies which + Prometheus endpoint to query """ self.redpanda = redpanda self.node = node @@ -41,10 +52,11 @@ def __init__(self, logger, redpanda, node, metrics, labels, reduce=None): self.logger = logger self._reduce = reduce + self._metrics_endpoint = metrics_endpoint self._initial_samples = self._capture(metrics) def _capture(self, check_metrics): - metrics = self.redpanda.metrics(self.node) + metrics = self.redpanda.metrics(self.node, self._metrics_endpoint) samples = {} for family in metrics: @@ -57,14 +69,15 @@ def _capture(self, check_metrics): if not include: continue - label_mismatch = False - for k, v in self.labels.items(): - if sample.labels.get(k, None) != v: - label_mismatch = True - continue + if self.labels: + label_mismatch = False + for k, v in self.labels.items(): + if sample.labels.get(k, None) != v: + label_mismatch = True + continue - if label_mismatch: - continue + if label_mismatch: + continue self.logger.info( f" Read {sample.name}={sample.value} {sample.labels}") @@ -84,7 +97,10 @@ def _capture(self, check_metrics): self.logger.info(f" Captured {k}={v}") if len(samples) == 0: - url = f"http://{self.node.account.hostname}:9644/metrics" + metrics_endpoint = ("/metrics" if self._metrics_endpoint + == MetricsEndpoint.METRICS else + "/public_metrics") + url = f"http://{self.node.account.hostname}:9644{metrics_endpoint}" import requests dump = requests.get(url).text self.logger.warn(f"Metrics dump: {dump}") diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index c3b6aebf1abf3..fdf931ae03069 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -20,6 +20,7 @@ import collections import re import uuid +from enum import Enum from typing import Mapping, Optional, Union, Any import yaml @@ -112,6 +113,11 @@ def f(sample): return MetricSamples([s for s in filter(f, self.samples)]) +class MetricsEndpoint(Enum): + METRICS = 1 + PUBLIC_METRICS = 2 + + def one_or_many(value): """ Helper for reading `one_or_many_property` configs when @@ -1519,16 +1525,24 @@ def schema_reg(self, limit=None) -> str: ] return ",".join(schema_reg) - def metrics(self, node): + def metrics(self, + node, + metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS): assert node in self._started - url = f"http://{node.account.hostname}:9644/metrics" + + metrics_endpoint = ("/metrics" if metrics_endpoint + == MetricsEndpoint.METRICS else "/public_metrics") + url = f"http://{node.account.hostname}:9644{metrics_endpoint}" resp = requests.get(url) assert resp.status_code == 200 return text_string_to_metric_families(resp.text) - def metrics_sample(self, - sample_pattern, - nodes=None) -> Optional[MetricSamples]: + def metrics_sample( + self, + sample_pattern, + nodes=None, + metrics_endpoint: MetricsEndpoint = MetricsEndpoint.METRICS, + ) -> Optional[MetricSamples]: """ Query metrics for a single sample using fuzzy name matching. This interface matches the sample pattern against sample names, and requires @@ -1554,7 +1568,7 @@ def metrics_sample(self, found_sample = None sample_values = [] for node in nodes: - metrics = self.metrics(node) + metrics = self.metrics(node, metrics_endpoint) for family in metrics: for sample in family.samples: if sample_pattern not in sample.name: diff --git a/tests/rptest/tests/cluster_metrics_test.py b/tests/rptest/tests/cluster_metrics_test.py new file mode 100644 index 0000000000000..09b3278e3df8f --- /dev/null +++ b/tests/rptest/tests/cluster_metrics_test.py @@ -0,0 +1,208 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import re + +from typing import Optional +from ducktape.utils.util import wait_until +from ducktape.cluster.cluster import ClusterNode + +from rptest.clients.rpk import RpkTool +from rptest.tests.redpanda_test import RedpandaTest +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.metrics_check import MetricCheck +from rptest.services.redpanda import MetricSamples, MetricsEndpoint + + +class ClusterMetricsTest(RedpandaTest): + cluster_level_metrics: list[str] = [ + "cluster_brokers", + "cluster_topics", + "cluster_partitions", + "cluster_unavailable_partitions", + ] + + def _stop_controller_node(self) -> ClusterNode: + """ + Stop the current controller node + """ + prev = self.redpanda.controller() + self.redpanda.stop_node(prev) + + return prev + + def _wait_until_controller_leader_is_stable(self): + """ + Wait for the controller leader to stabilise. + This helper considers the leader stable if the same node + is reported by two consecutive admin API queries. + """ + prev = None + + def controller_stable(): + nonlocal prev + curr = self.redpanda.controller() + + if prev != curr: + prev = curr + return False + else: + return True + + wait_until( + controller_stable, + timeout_sec=10, + backoff_sec=2, + err_msg="Controller leader did not stabilise", + ) + + def _failover(self): + """ + Stop current controller node and wait for failover + """ + prev = self._stop_controller_node() + + def new_controller_elected(): + curr = self.redpanda.controller() + return curr and curr != prev + + wait_until( + new_controller_elected, + timeout_sec=20, + backoff_sec=1, + err_msg="Controller did not failover", + ) + + return prev + + def _get_value_from_samples(self, samples: MetricSamples): + """ + Extract the metric value from the samples. + Only one sample is expected as cluster level metrics have no labels. + """ + assert len(samples.samples) == 1 + return samples.samples[0].value + + def _get_cluster_metrics( + self, node: ClusterNode) -> Optional[dict[str, MetricSamples]]: + """ + Get all the cluster level metrics exposed by a specified + node in the cluster. + """ + def get_metrics_from_node(pattern: str): + return self.redpanda.metrics_sample(pattern, [node], + MetricsEndpoint.PUBLIC_METRICS) + + metrics_samples = {} + for name in ClusterMetricsTest.cluster_level_metrics: + samples = get_metrics_from_node(name) + if samples is not None: + metrics_samples[name] = samples + + if not metrics_samples: + return None + else: + return metrics_samples + + def _assert_reported_by_controller(self): + """ + Enforce the fact that only the controller leader should + report cluster level metrics. If there's no leader, no + node should report these metrics. + """ + current_controller = self.redpanda.controller() + for node in self.redpanda.started_nodes(): + metrics = self._get_cluster_metrics(node) + + if current_controller is None: + assert ( + metrics is None + ), f"Node {node.name} reported cluster metrics, but the cluster has no leader" + elif current_controller == node: + assert ( + metrics is not None + ), f"Node {node.name} is controller leader, but did not report cluster metrics" + else: + assert ( + metrics is None + ), f"Node {node.name} is not controller leader, but it reported cluster metrics" + + @cluster(num_nodes=3) + def cluster_metrics_reported_only_by_leader_test(self): + """ + Test that only the controller leader reports the cluster + level metrics at any given time. + """ + # Assert metrics are reported once in a fresh, three node cluster + self._assert_reported_by_controller() + + # Restart the controller node and assert. + controller = self.redpanda.controller() + self.redpanda.restart_nodes([controller], + start_timeout=10, + stop_timeout=10) + self._wait_until_controller_leader_is_stable() + self._assert_reported_by_controller() + + # Stop the controller node and assert. + self._failover() + self._assert_reported_by_controller() + + # Stop the controller node and assert again. + # This time the metrics should not be reported as a controller + # couldn't be elected due to lack of quorum. + self._stop_controller_node() + self._assert_reported_by_controller() + + @cluster(num_nodes=3) + def cluster_metrics_correctness_test(self): + """ + Test that the cluster level metrics move in the expected way + after creating a topic. + """ + self._assert_reported_by_controller() + + controller_node = self.redpanda.controller() + cluster_metrics = MetricCheck( + self.logger, + self.redpanda, + controller_node, + re.compile("redpanda_cluster_.*"), + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS) + + RpkTool(self.redpanda).create_topic("test-topic", partitions=3) + + # Check that the metrics have moved in the expected way by the creation + # of one topic with three partitions. + cluster_metrics.expect([ + ("redpanda_cluster_brokers", lambda a, b: a == b == 3), + ("redpanda_cluster_topics", lambda a, b: b - a == 1), + ("redpanda_cluster_partitions", lambda a, b: b - a == 3), + ("redpanda_cluster_unavailable_partitions", + lambda a, b: a == b == 0) + ]) + + @cluster(num_nodes=3) + def cluster_metrics_disabled_by_config_test(self): + """ + Test that the cluster level metrics have the expected values + before and after creating a topic. + """ + # 'disable_public_metrics' defaults to false so cluster metrics + # are expected + self._assert_reported_by_controller() + + self.redpanda.set_cluster_config({"disable_public_metrics": "true"}, + expect_restart=True) + + # The 'public_metrics' endpoint that serves cluster level + # metrics should not return anything when + # 'disable_public_metrics' == true + cluster_metrics = self._get_cluster_metrics(self.redpanda.controller()) + assert cluster_metrics is None