Skip to content

Commit

Permalink
c/shard_placement_table: add metrics
Browse files Browse the repository at this point in the history
Add metrics tracking the number of partition replicas assigned to a
shard, hosted on a shard, and the number of partitions needing
reconciliation of shard-local state.
  • Loading branch information
ztlpn committed Jul 29, 2024
1 parent 7b0f1bf commit ee3aefc
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 49 deletions.
186 changes: 140 additions & 46 deletions src/v/cluster/shard_placement_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,59 @@
#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "cluster/topic_table.h"
#include "metrics/prometheus_sanitize.h"
#include "ssx/async_algorithm.h"
#include "types.h"

#include <seastar/util/defer.hh>

namespace cluster {

class shard_placement_table::probe {
public:
probe() = default;
probe(const probe&) = delete;
probe& operator=(const probe&) = delete;
probe(probe&&) = delete;
probe& operator=(probe&&) = delete;

void update_assigned(int64_t delta) { _total_assigned += delta; }
void update_hosted(int64_t delta) { _total_hosted += delta; }
void update_to_reconcile(int64_t delta) { _to_reconcile += delta; }

void setup_metrics() {
if (config::shard_local_cfg().disable_metrics()) {
return;
}

namespace sm = ss::metrics;
_metrics.add_group(
prometheus_sanitize::metrics_name("cluster:shard_placement"),
{
sm::make_gauge(
"assigned_partitions",
[this] { return _total_assigned; },
sm::description("Number of partitions assigned to this shard")),
sm::make_gauge(
"hosted_partitions",
[this] { return _total_hosted; },
sm::description("Number of partitions hosted on this shard")),
sm::make_gauge(
"partitions_to_reconcile",
[this] { return _to_reconcile; },
sm::description("Number of partitions needing reconciliation of "
"shard-local state")),
});
}

private:
int64_t _total_assigned = 0;
int64_t _total_hosted = 0;
int64_t _to_reconcile = 0;

metrics::internal_metric_groups _metrics;
};

std::ostream& operator<<(
std::ostream& o, const shard_placement_table::shard_local_assignment& as) {
fmt::print(
Expand Down Expand Up @@ -94,21 +140,50 @@ shard_placement_table::placement_state::get_reconciliation_action(
}
}

bool shard_placement_table::placement_state::is_reconciled() const {
if (!_assigned) {
return !_current;
}
return _current && _current->log_revision == _assigned->log_revision
&& _current->status == hosted_status::hosted;
}

void shard_placement_table::placement_state::set_assigned(
std::optional<shard_local_assignment> new_assigned) {
std::optional<shard_local_assignment> new_assigned,
shard_placement_table::probe& probe) {
int64_t assigned_delta = -(int64_t)_assigned.has_value();
int64_t to_reconcile_delta = (int64_t)is_reconciled();

_assigned = std::move(new_assigned);

assigned_delta += _assigned.has_value();
to_reconcile_delta -= is_reconciled();
probe.update_assigned(assigned_delta);
probe.update_to_reconcile(to_reconcile_delta);
}

void shard_placement_table::placement_state::set_current(
std::optional<shard_local_state> new_current) {
std::optional<shard_local_state> new_current,
shard_placement_table::probe& probe) {
auto is_hosted = [this] {
return _current && _current->status == hosted_status::hosted;
};
int64_t hosted_delta = -(int64_t)is_hosted();
int64_t to_reconcile_delta = (int64_t)is_reconciled();

_current = std::move(new_current);

hosted_delta += is_hosted();
to_reconcile_delta -= is_reconciled();
probe.update_hosted(hosted_delta);
probe.update_to_reconcile(to_reconcile_delta);
}

void shard_placement_table::placement_state::set_hosted_status(
hosted_status new_status) {
hosted_status new_status, shard_placement_table::probe& probe) {
auto new_current = _current;
new_current.value().status = new_status;
set_current(std::move(new_current));
set_current(std::move(new_current), probe);
}

std::ostream&
Expand Down Expand Up @@ -188,7 +263,14 @@ bytes current_state_kvstore_key(const raft::group_id group) {
shard_placement_table::shard_placement_table(
ss::shard_id shard, storage::kvstore& kvstore)
: _shard(shard)
, _kvstore(kvstore) {}
, _kvstore(kvstore)
, _probe(std::make_unique<probe>()) {
if (ss::this_shard_id() == _shard) {
_probe->setup_metrics();
}
}

shard_placement_table::~shard_placement_table() = default;

bool shard_placement_table::is_persistence_enabled() const {
assert_is_assignment_shard();
Expand Down Expand Up @@ -470,10 +552,12 @@ shard_placement_table::gather_init_states(
marker.log_revision,
marker.shard_revision);

_states[ntp_it->second].set_assigned(shard_local_assignment{
.group = group,
.log_revision = marker.log_revision,
.shard_revision = marker.shard_revision});
_states[ntp_it->second].set_assigned(
shard_local_assignment{
.group = group,
.log_revision = marker.log_revision,
.shard_revision = marker.shard_revision},
*_probe);
}
break;
}
Expand All @@ -498,12 +582,14 @@ shard_placement_table::gather_init_states(
marker.ntp,
_shard));
}
state.set_current(shard_local_state(
group,
marker.log_revision,
marker.is_complete ? hosted_status::hosted
: hosted_status::receiving,
marker.shard_revision));
state.set_current(
shard_local_state(
group,
marker.log_revision,
marker.is_complete ? hosted_status::hosted
: hosted_status::receiving,
marker.shard_revision),
*_probe);
break;
}
}
Expand Down Expand Up @@ -548,7 +634,7 @@ ss::future<> shard_placement_table::scatter_init_data(
} else if (
_shard != init_data.receiving.shard || !init_data.hosted.shard
|| _shard >= ss::smp::count) {
state.set_hosted_status(hosted_status::obsolete);
state.set_hosted_status(hosted_status::obsolete, *_probe);
}
}

Expand All @@ -558,14 +644,14 @@ ss::future<> shard_placement_table::scatter_init_data(
fut = _kvstore.remove(
kvstore_key_space,
assignment_kvstore_key(state.assigned()->group));
state.set_assigned(std::nullopt);
state.set_assigned(std::nullopt, *_probe);
} else if (!init_data.hosted.shard) {
state._is_initial_for = init_data.log_revision;
}

if (_shard >= ss::smp::count) {
// mark states on extra shards as ready to transfer
state.set_assigned(std::nullopt);
state.set_assigned(std::nullopt, *_probe);
}
}

Expand Down Expand Up @@ -674,17 +760,19 @@ ss::future<> shard_placement_table::do_initialize_from_topic_table(
// cross-shard transfer, orig_shard gets the hosted marker
if (ss::this_shard_id() == orig_shard) {
placement.set_current(
shard_local_state(assigned, hosted_status::hosted));
shard_local_state(assigned, hosted_status::hosted),
*_probe);
_states.emplace(ntp, placement);
} else if (ss::this_shard_id() == target->shard) {
placement.set_assigned(assigned);
placement.set_assigned(assigned, *_probe);
_states.emplace(ntp, placement);
}
} else if (ss::this_shard_id() == target->shard) {
// in other cases target shard gets the hosted marker
placement.set_current(
shard_local_state(assigned, hosted_status::hosted));
placement.set_assigned(assigned);
shard_local_state(assigned, hosted_status::hosted),
*_probe);
placement.set_assigned(assigned, *_probe);
_states.emplace(ntp, placement);
}
});
Expand Down Expand Up @@ -805,7 +893,7 @@ ss::future<> shard_placement_table::set_target(
state.assigned(),
is_initial);

state.set_assigned(as);
state.set_assigned(as, *spt._probe);
if (is_initial) {
state._is_initial_for = as.log_revision;
}
Expand All @@ -832,7 +920,7 @@ ss::future<> shard_placement_table::set_target(
ntp,
it->second.assigned());

it->second.set_assigned(std::nullopt);
it->second.set_assigned(std::nullopt, *other._probe);
if (it->second.is_empty()) {
// We are on a shard that was previously a target, but didn't
// get to starting the transfer.
Expand Down Expand Up @@ -953,7 +1041,7 @@ ss::future<std::error_code> shard_placement_table::prepare_create(
}

state.set_current(
shard_local_state(assigned, hosted_status::hosted));
shard_local_state(assigned, hosted_status::hosted), *_probe);
if (state._is_initial_for == expected_log_rev) {
// could have changed while we were updating kvstore.
state._is_initial_for = std::nullopt;
Expand Down Expand Up @@ -1093,8 +1181,10 @@ shard_placement_table::prepare_transfer(
// at this point we commit to the transfer on the
// destination shard
shard_rev = dest_state.assigned().value().shard_revision;
dest_state.set_current(shard_local_state(
dest_state.assigned().value(), hosted_status::receiving));
dest_state.set_current(
shard_local_state(
dest_state.assigned().value(), hosted_status::receiving),
*dest._probe);

if (dest._persistence_enabled) {
auto marker_buf = serde::to_iobuf(current_state_marker{
Expand All @@ -1114,12 +1204,13 @@ shard_placement_table::prepare_transfer(
kvstore_key_space,
current_state_kvstore_key(dest_state.current()->group),
std::move(marker_buf))
.handle_exception([&dest_state](std::exception_ptr ex) {
// "unlock" destination in case of kvstore errors so
// that we can retry later.
dest_state.set_current(std::nullopt);
return ss::make_exception_future(std::move(ex));
});
.handle_exception(
[&dest, &dest_state](std::exception_ptr ex) {
// "unlock" destination in case of kvstore errors so
// that we can retry later.
dest_state.set_current(std::nullopt, *dest._probe);
return ss::make_exception_future(std::move(ex));
});
} else {
return ss::now();
}
Expand Down Expand Up @@ -1198,20 +1289,22 @@ ss::future<> shard_placement_table::finish_transfer(
std::move(marker_buf));
}

return std::move(fut).then([&ntp, &dest_state, shard_callback] {
dest_state.set_hosted_status(hosted_status::hosted);
vlog(
clusterlog.trace,
"[{}] finished transfer on destination, placement: {}",
ntp,
dest_state);
shard_callback(ntp);
});
return std::move(fut).then(
[&dest, &ntp, &dest_state, shard_callback] {
dest_state.set_hosted_status(
hosted_status::hosted, *dest._probe);
vlog(
clusterlog.trace,
"[{}] finished transfer on destination, placement: {}",
ntp,
dest_state);
shard_callback(ntp);
});
});
state._next = std::nullopt;

if (state.current() && state.current()->log_revision == expected_log_rev) {
state.set_hosted_status(hosted_status::obsolete);
state.set_hosted_status(hosted_status::obsolete, *_probe);
}

if (state._is_initial_for == expected_log_rev) {
Expand Down Expand Up @@ -1245,7 +1338,7 @@ ss::future<std::error_code> shard_placement_table::prepare_delete(
co_return errc::waiting_for_partition_shutdown;
}

state.set_hosted_status(hosted_status::obsolete);
state.set_hosted_status(hosted_status::obsolete, *_probe);
}

if (state._next) {
Expand All @@ -1259,7 +1352,8 @@ ss::future<std::error_code> shard_placement_table::prepare_delete(
it != dest._states.end() && it->second.current()
&& it->second.current()->shard_revision == expected_shard_rev
&& it->second.current()->status == hosted_status::receiving) {
it->second.set_hosted_status(hosted_status::obsolete);
it->second.set_hosted_status(
hosted_status::obsolete, *dest._probe);
}

// TODO: notify reconciliation fiber
Expand Down Expand Up @@ -1309,7 +1403,7 @@ ss::future<> shard_placement_table::do_delete(
kvstore_key_space,
current_state_kvstore_key(state.current()->group));
}
state.set_current(std::nullopt);
state.set_current(std::nullopt, *_probe);
}

if (state.is_empty()) {
Expand Down
17 changes: 14 additions & 3 deletions src/v/cluster/shard_placement_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ namespace cluster {
/// reconciliation process.
class shard_placement_table
: public ss::peering_sharded_service<shard_placement_table> {
private:
class probe;

public:
// assignment modification methods must be called on this shard
static constexpr ss::shard_id assignment_shard_id = 0;
Expand Down Expand Up @@ -133,9 +136,14 @@ class shard_placement_table
return !_current && !_is_initial_for && !_assigned;
}

void set_assigned(std::optional<shard_local_assignment>);
void set_current(std::optional<shard_local_state>);
void set_hosted_status(hosted_status);
/// True if shard-local state for the partition is reconciled.
bool is_reconciled() const;

void set_assigned(
std::optional<shard_local_assignment>, shard_placement_table::probe&);
void set_current(
std::optional<shard_local_state>, shard_placement_table::probe&);
void set_hosted_status(hosted_status, shard_placement_table::probe&);

struct versioned_shard {
ss::shard_id shard;
Expand All @@ -161,6 +169,7 @@ class shard_placement_table
using ntp2state_t = absl::node_hash_map<model::ntp, placement_state>;

explicit shard_placement_table(ss::shard_id, storage::kvstore&);
~shard_placement_table();

/// Must be called on assignment_shard_id.
bool is_persistence_enabled() const;
Expand Down Expand Up @@ -284,6 +293,8 @@ class shard_placement_table

chunked_hash_map<model::ntp, std::unique_ptr<entry_t>> _ntp2entry;
model::shard_revision_id _cur_shard_revision{0};

std::unique_ptr<probe> _probe;
};

std::ostream& operator<<(std::ostream&, shard_placement_table::hosted_status);
Expand Down

0 comments on commit ee3aefc

Please sign in to comment.