Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] Added vectorized_raft_learners_gap_bytes metric #19874

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3979,4 +3979,18 @@ std::optional<model::offset> consensus::get_learner_start_offset() const {
return std::nullopt;
}

size_t consensus::bytes_to_deliver_to_learners() const {
if (!is_leader()) {
return 0;
}

size_t total = 0;
for (auto& [f_id, f_meta] : _fstats) {
if (f_meta.is_learner) [[unlikely]] {
total += _log->size_bytes_after_offset(f_meta.match_index);
}
}
return total;
}

} // namespace raft
5 changes: 5 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ class consensus {
bool has_configuration_override() const {
return _configuration_manager.has_configuration_override();
}
/**
* Returns the number of bytes that are required to deliver to all
* learners that are being recovered.
*/
size_t bytes_to_deliver_to_learners() const;

private:
friend replicate_entries_stm;
Expand Down
44 changes: 41 additions & 3 deletions src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ group_manager::group_manager(
_configuration.heartbeat_interval)
, _feature_table(feature_table.local())
, _flush_timer_jitter(_configuration.flush_timer_interval_ms)
// we use a reasonable default not to bloat the configuration properties
, _metric_collection_interval(5s)
, _metrics_timer([this] {
try {
collect_learner_metrics();
} catch (...) {
vlog(
raftlog.error,
"failed to collect learner metrics - {}",
std::current_exception());
}
})
, _is_ready(false) {
_flush_timer.set_callback([this] {
ssx::spawn_with_gate(_gate, [this] {
Expand All @@ -70,9 +82,13 @@ ss::future<> group_manager::start() {
co_await _heartbeats.start();
co_await _recovery_scheduler.start();
_flush_timer.arm(_flush_timer_jitter());
_metrics_timer.arm_periodic(_metric_collection_interval);
}

ss::future<> group_manager::stop() {
_metrics.clear();
_public_metrics.clear();
_metrics_timer.cancel();
auto f = _gate.close();
_flush_timer.cancel();

Expand Down Expand Up @@ -225,9 +241,22 @@ void group_manager::setup_metrics() {
_metrics.add_group(
prometheus_sanitize::metrics_name("raft"),
{sm::make_gauge(
"group_count",
[this] { return _groups.size(); },
sm::description("Number of raft groups"))});
"group_count",
[this] { return _groups.size(); },
sm::description("Number of raft groups")),
sm::make_gauge(
"learners_gap_bytes",
[this] { return _learners_gap_bytes; },
sm::description(
"Total numbers of bytes that must be delivered to learners"))});

_public_metrics.add_group(
prometheus_sanitize::metrics_name("raft"),
{sm::make_gauge(
"learners_gap_bytes",
[this] { return _learners_gap_bytes; },
sm::description(
"Total numbers of bytes that must be delivered to learners"))});
}

ss::future<> group_manager::flush_groups() {
Expand All @@ -248,4 +277,13 @@ ss::future<> group_manager::flush_groups() {
return ss::now();
});
}
void group_manager::collect_learner_metrics() {
// we can use a synchronous loop here as the number of raft groups per core
// is limited.
_learners_gap_bytes = 0;
for (const auto& group : _groups) {
_learners_gap_bytes += group->bytes_to_deliver_to_learners();
}
}

} // namespace raft
6 changes: 5 additions & 1 deletion src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class group_manager {
void setup_metrics();

ss::future<> flush_groups();
void collect_learner_metrics();

raft::group_configuration create_initial_configuration(
std::vector<model::broker>, model::revision_id) const;
Expand All @@ -114,14 +115,17 @@ class group_manager {
notification_list<leader_cb_t, cluster::notification_id_type>
_notifications;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
storage::api& _storage;
coordinated_recovery_throttle& _recovery_throttle;
recovery_memory_quota _recovery_mem_quota;
recovery_scheduler _recovery_scheduler;
features::feature_table& _feature_table;
ss::timer<clock_type> _flush_timer;
timeout_jitter _flush_timer_jitter;

std::chrono::milliseconds _metric_collection_interval;
ss::timer<> _metrics_timer;
size_t _learners_gap_bytes{0};
bool _is_ready;
};

Expand Down
63 changes: 62 additions & 1 deletion tests/rptest/tests/nodes_decommissioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from rptest.clients.types import TopicSpec
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.admin import Admin
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, make_redpanda_service, SISettings
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, SISettings
from rptest.utils.node_operations import NodeDecommissionWaiter


Expand Down Expand Up @@ -422,6 +422,67 @@ def test_decommissioning_cancel_ongoing_movements(self):

self.verify()

@skip_debug_mode
@cluster(num_nodes=6)
def test_learner_gap_metrics(self):
self.start_redpanda()
# set small segment size to calculate the gap correctly
self.redpanda.set_cluster_config({"log_segment_size": 1024 * 1024})
self._create_topics()

self.start_producer()
self.start_consumer()
# set recovery rate to small value to stop moves
self._set_recovery_rate(1)

def calculate_total_learners_gap() -> int | None:
gap = self.redpanda.metrics_sample("learners_gap_bytes")
if gap is None:
return None
return sum(g.value for g in gap.samples)

assert calculate_total_learners_gap(
) == 0, "when there are no pending partition movements the reported gap should be equal to 0"

to_decommission = random.choice(self.redpanda.nodes)
to_decommission_id = self.redpanda.node_id(to_decommission)

self.logger.info(f"decommissioning node: {to_decommission_id}", )
self._decommission(to_decommission_id)

def learner_gap_reported(decommissioned_node_id: int):
total_gap = calculate_total_learners_gap()
p_size = self.redpanda.metrics_sample("partition_size")
if not total_gap or not p_size:
return False
total_size = sum(
ps.value for ps in p_size.samples
if self.redpanda.node_id(ps.node) == decommissioned_node_id)

self.logger.info(
f"decommissioned node total size: {total_size}, total_gap: {total_gap}"
)
assert total_gap < total_size, "gap can not be larger than the size of partitions"
# assume that the total gap is equal to the total size of
# decommissioned node with the tolerance of 5%
return (total_size - total_gap) < total_size * 0.05

wait_until(lambda: learner_gap_reported(to_decommission_id),
timeout_sec=60,
backoff_sec=1)
self._set_recovery_rate(100 * 1024 * 1024)
# wait for decommissioned node to be removed
self._wait_for_node_removed(to_decommission_id)

# Stop the decommissioned node, because redpanda internally does not
# fence it, it is the responsibility of external orchestrator to
# stop the node they intend to remove.
# This can be removed when we change redpanda to prevent decommissioned nodes
# from responding to client Kafka requests.
self.redpanda.stop_node(to_decommission)

self.verify()

@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_recommissioning_node(self):
self.start_redpanda()
Expand Down
Loading