Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node-local core assignment: rebalancing #19864

Merged
merged 14 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,13 @@ ss::future<> controller::start(
return _shard_balancer.start_single(
std::ref(_shard_placement),
std::ref(_feature_table),
std::ref(_storage),
std::ref(_tp_state),
std::ref(_backend));
std::ref(_backend),
config::shard_local_cfg()
.core_balancing_on_core_count_change.bind(),
config::shard_local_cfg().core_balancing_continuous.bind(),
config::shard_local_cfg().core_balancing_debounce_timeout.bind());
})
.then(
[this] { return _drain_manager.invoke_on_all(&drain_manager::start); })
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/feature_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ ss::future<> feature_manager::maybe_log_license_check_info() {
cfg.audit_enabled || cfg.cloud_storage_enabled
|| cfg.partition_autobalancing_mode
== model::partition_autobalancing_mode::continuous
|| has_gssapi() || has_oidc() || has_schma_id_validation()
|| has_non_default_roles) {
|| cfg.core_balancing_continuous() || has_gssapi() || has_oidc()
|| has_schma_id_validation() || has_non_default_roles) {
const auto& license = _feature_table.local().get_license();
if (!license || license->is_expired()) {
vlog(
Expand Down
181 changes: 175 additions & 6 deletions src/v/cluster/shard_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,55 @@

namespace cluster {

namespace {

const bytes& state_kvstore_key() {
static thread_local bytes key = []() {
iobuf buf;
serde::write(buf, shard_placement_kvstore_key_type::balancer_state);
return iobuf_to_bytes(buf);
}();
return key;
}

struct persisted_state
: serde::
envelope<persisted_state, serde::version<0>, serde::compat_version<0>> {
uint32_t last_rebalance_core_count = 0;

bool operator==(const persisted_state&) const = default;
auto serde_fields() { return std::tie(last_rebalance_core_count); }
};

} // namespace

shard_balancer::shard_balancer(
ss::sharded<shard_placement_table>& spt,
ss::sharded<features::feature_table>& features,
ss::sharded<storage::api>& storage,
ss::sharded<topic_table>& topics,
ss::sharded<controller_backend>& cb)
ss::sharded<controller_backend>& cb,
config::binding<bool> balancing_on_core_count_change,
config::binding<bool> balancing_continuous,
config::binding<std::chrono::milliseconds> debounce_timeout)
: _shard_placement(spt.local())
, _features(features.local())
, _kvstore(storage.local().kvs())
, _topics(topics)
, _controller_backend(cb)
, _self(*config::node().node_id())
, _balancing_on_core_count_change(std::move(balancing_on_core_count_change))
, _balancing_continuous(std::move(balancing_continuous))
, _debounce_timeout(std::move(debounce_timeout))
, _debounce_jitter(_debounce_timeout())
, _balance_timer([this] { balance_timer_callback(); })
, _total_counts(ss::smp::count, 0) {
_total_counts.at(0) += 1; // controller partition

_debounce_timeout.watch([this] {
_debounce_jitter = simple_time_jitter<ss::lowres_clock>(
_debounce_timeout());
});
}

ss::future<> shard_balancer::start() {
Expand Down Expand Up @@ -125,8 +162,15 @@ ss::future<> shard_balancer::start() {
_to_assign.insert(ntp);
}
});

co_await do_assign_ntps(lock);

if (
_balancing_on_core_count_change()
&& _features.is_active(features::feature::node_local_core_assignment)) {
co_await balance_on_core_count_change(lock);
}

vassert(
tt_version == _topics.local().topics_map_revision(),
"topic_table unexpectedly changed");
Expand Down Expand Up @@ -161,6 +205,7 @@ ss::future<> shard_balancer::stop() {
shard_id);

_topics.local().unregister_delta_notification(_topic_table_notify_handle);
_balance_timer.cancel();
_wakeup_event.set();
return _gate.close();
}
Expand Down Expand Up @@ -218,6 +263,20 @@ shard_balancer::reassign_shard(model::ntp ntp, ss::shard_id shard) {
co_return errc::success;
}

errc shard_balancer::trigger_rebalance() {
if (_gate.is_closed()) {
return errc::shutting_down;
}

if (!_features.is_active(features::feature::node_local_core_assignment)) {
return errc::feature_disabled;
}

vlog(clusterlog.info, "triggering manual rebalancing");
_balance_timer.rearm(ss::lowres_clock::now());
return errc::success;
}

ss::future<> shard_balancer::assign_fiber() {
if (_gate.is_closed()) {
co_return;
Expand All @@ -243,7 +302,7 @@ ss::future<> shard_balancer::do_assign_ntps(mutex::units& lock) {
auto to_assign = std::exchange(_to_assign, {});
co_await ssx::async_for_each(
to_assign.begin(), to_assign.end(), [&](const model::ntp& ntp) {
maybe_assign(ntp, new_targets);
maybe_assign(ntp, /*can_reassign=*/false, new_targets);
});

co_await ss::max_concurrent_for_each(
Expand All @@ -260,7 +319,7 @@ ss::future<> shard_balancer::do_assign_ntps(mutex::units& lock) {
}

void shard_balancer::maybe_assign(
const model::ntp& ntp, ntp2target_t& new_targets) {
const model::ntp& ntp, bool can_reassign, ntp2target_t& new_targets) {
std::optional<shard_placement_target> prev_target
= _shard_placement.get_target(ntp);

Expand All @@ -286,20 +345,44 @@ void shard_balancer::maybe_assign(
_shard_placement.is_persistence_enabled(),
"expected persistence to be enabled");

std::optional<ss::shard_id> prev_shard;
if (prev_target && prev_target->log_revision == log_revision) {
prev_shard = prev_target->shard;
}

if (prev_shard && !can_reassign) {
// partition already assigned, keep current shard.
return;
}

auto new_shard = choose_shard(ntp, topic_data, prev_shard);
if (new_shard == prev_shard) {
return;
}

target.emplace(
replicas_view->assignment.group,
log_revision.value(),
choose_shard(ntp, topic_data, std::nullopt));
replicas_view->assignment.group, log_revision.value(), new_shard);
} else {
// node-local shard placement not enabled yet, get target from
// topic_table.
target = placement_target_on_node(replicas_view.value(), _self);
}
} else {
// partition is removed from this node, this will likely disrupt the
// counts balance, so we set up the balancing timer.

if (
_features.is_active(features::feature::node_local_core_assignment)
&& _balancing_continuous() && !_balance_timer.armed()) {
// Add jitter so that different nodes don't move replicas of the
// same partition in unison.
auto debounce_interval = _debounce_jitter.next_duration();
vlog(
clusterlog.info,
"scheduling balancing in {}s.",
debounce_interval / 1s);
_balance_timer.arm(debounce_interval);
}
}

vlog(
Expand All @@ -315,6 +398,92 @@ void shard_balancer::maybe_assign(
new_targets.emplace(ntp, target);
}

ss::future<> shard_balancer::balance_on_core_count_change(mutex::units& lock) {
uint32_t last_rebalance_core_count = 0;
auto state_buf = _kvstore.get(
storage::kvstore::key_space::shard_placement, state_kvstore_key());
if (state_buf) {
last_rebalance_core_count = serde::from_iobuf<persisted_state>(
std::move(*state_buf))
.last_rebalance_core_count;
}

// If there is no state in kvstore, this means that we are restarting with
// shard balancing enabled for the first time, and this is a good time to
// rebalance as well.

if (last_rebalance_core_count == ss::smp::count) {
co_return;
}

vlog(
clusterlog.info, "detected core count change, triggering rebalance...");
co_await do_balance(lock);
}

void shard_balancer::balance_timer_callback() {
ssx::spawn_with_gate(_gate, [this] {
return _mtx.get_units()
.then([this](mutex::units lock) {
return ss::do_with(std::move(lock), [this](mutex::units& lock) {
return do_balance(lock);
});
})
.handle_exception([this](const std::exception_ptr& e) {
if (ssx::is_shutdown_exception(e)) {
return;
}

// Retry balancing after some time.
if (!_balance_timer.armed()) {
_balance_timer.arm(_debounce_jitter.next_duration());
}
vlog(
clusterlog.warn,
"failed to balance: {}, retrying after {}s.",
e,
(_balance_timer.get_timeout() - ss::lowres_clock::now()) / 1s);
});
});
}

ss::future<> shard_balancer::do_balance(mutex::units& lock) {
// Go over all node-local ntps in random order and try to find a more
// optimal core for them.
chunked_vector<model::ntp> ntps;
co_await _shard_placement.for_each_ntp(
[&](const model::ntp& ntp, const shard_placement_target&) {
ntps.push_back(ntp);
});
std::shuffle(ntps.begin(), ntps.end(), random_generators::internal::gen);

ntp2target_t new_targets;
co_await ssx::async_for_each(
ntps.begin(), ntps.end(), [&](const model::ntp& ntp) {
maybe_assign(ntp, /*can_reassign=*/true, new_targets);
});

vlog(
clusterlog.info,
"after balancing {} ntps were reassigned",
new_targets.size());

co_await ss::max_concurrent_for_each(
new_targets,
128,
[this, &lock](const decltype(new_targets)::value_type& kv) {
const auto& [ntp, target] = kv;
return set_target(ntp, target, lock);
ztlpn marked this conversation as resolved.
Show resolved Hide resolved
});

co_await _kvstore.put(
storage::kvstore::key_space::shard_placement,
state_kvstore_key(),
serde::to_iobuf(persisted_state{
.last_rebalance_core_count = ss::smp::count,
}));
}

ss::future<> shard_balancer::set_target(
const model::ntp& ntp,
const std::optional<shard_placement_target>& target,
Expand Down
23 changes: 22 additions & 1 deletion src/v/cluster/shard_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/controller_backend.h"
#include "cluster/shard_placement_table.h"
#include "container/chunked_hash_map.h"
#include "random/simple_time_jitter.h"
#include "ssx/event.h"
#include "utils/mutex.h"

Expand All @@ -36,8 +37,12 @@ class shard_balancer {
shard_balancer(
ss::sharded<shard_placement_table>&,
ss::sharded<features::feature_table>&,
ss::sharded<storage::api>&,
ss::sharded<topic_table>&,
ss::sharded<controller_backend>&);
ss::sharded<controller_backend>&,
config::binding<bool> balancing_on_core_count_change,
config::binding<bool> balancing_continuous,
config::binding<std::chrono::milliseconds> debounce_timeout);

ss::future<> start();
ss::future<> stop();
Expand All @@ -50,14 +55,23 @@ class shard_balancer {
/// Manually set shard placement for an ntp that has a replica on this node.
ss::future<errc> reassign_shard(model::ntp, ss::shard_id);

/// Manually trigger shard placement rebalancing for partitions in this
/// node.
errc trigger_rebalance();

private:
void process_delta(const topic_table::delta&);

ss::future<> assign_fiber();
ss::future<> do_assign_ntps(mutex::units& lock);

ss::future<> balance_on_core_count_change(mutex::units& lock);
void balance_timer_callback();
ss::future<> do_balance(mutex::units& lock);

void maybe_assign(
const model::ntp&,
bool can_reassign,
chunked_hash_map<model::ntp, std::optional<shard_placement_target>>&);

ss::future<> set_target(
Expand Down Expand Up @@ -88,11 +102,18 @@ class shard_balancer {
private:
shard_placement_table& _shard_placement;
features::feature_table& _features;
storage::kvstore& _kvstore;
ss::sharded<topic_table>& _topics;
ss::sharded<controller_backend>& _controller_backend;
model::node_id _self;

config::binding<bool> _balancing_on_core_count_change;
config::binding<bool> _balancing_continuous;
config::binding<std::chrono::milliseconds> _debounce_timeout;
simple_time_jitter<ss::lowres_clock> _debounce_jitter;

cluster::notification_id_type _topic_table_notify_handle;
ss::timer<ss::lowres_clock> _balance_timer;
ssx::event _wakeup_event{"shard_balancer"};
mutex _mtx{"shard_balancer"};
ss::gate _gate;
Expand Down
27 changes: 21 additions & 6 deletions src/v/cluster/shard_placement_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,7 @@ namespace {
static constexpr auto kvstore_key_space
= storage::kvstore::key_space::shard_placement;

// enum type is irrelevant, serde will serialize to 32 bit anyway
enum class kvstore_key_type {
persistence_enabled = 0,
assignment = 1,
current_state = 2,
};
using kvstore_key_type = shard_placement_kvstore_key_type;

bytes persistence_enabled_kvstore_key() {
iobuf buf;
Expand Down Expand Up @@ -831,6 +826,26 @@ shard_placement_table::get_target(const model::ntp& ntp) const {
return std::nullopt;
}

ss::future<> shard_placement_table::for_each_ntp(
ss::noncopyable_function<
void(const model::ntp&, const shard_placement_target&)> func) const {
vassert(
ss::this_shard_id() == assignment_shard_id,
"method can only be invoked on shard {}",
assignment_shard_id);
return ssx::async_for_each(
_ntp2entry.begin(),
_ntp2entry.end(),
[&func](const decltype(_ntp2entry)::value_type& kv) {
const auto& [ntp, entry] = kv;
vassert(
entry && entry->target && entry->mtx.ready(),
"[{}]: unexpected concurrent set_target()",
ntp);
func(ntp, *entry->target);
});
}

ss::future<std::error_code> shard_placement_table::prepare_create(
const model::ntp& ntp, model::revision_id expected_log_rev) {
// ensure that there is no concurrent enable_persistence() call
Expand Down
Loading