diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 07b38adab7224..cc4f93e079c8c 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -465,8 +465,13 @@ ss::future<> controller::start( return _shard_balancer.start_single( std::ref(_shard_placement), std::ref(_feature_table), + std::ref(_storage), std::ref(_tp_state), - std::ref(_backend)); + std::ref(_backend), + config::shard_local_cfg() + .core_balancing_on_core_count_change.bind(), + config::shard_local_cfg().core_balancing_continuous.bind(), + config::shard_local_cfg().core_balancing_debounce_timeout.bind()); }) .then( [this] { return _drain_manager.invoke_on_all(&drain_manager::start); }) diff --git a/src/v/cluster/feature_manager.cc b/src/v/cluster/feature_manager.cc index cb79bbec30028..12f7411249bff 100644 --- a/src/v/cluster/feature_manager.cc +++ b/src/v/cluster/feature_manager.cc @@ -237,8 +237,8 @@ ss::future<> feature_manager::maybe_log_license_check_info() { cfg.audit_enabled || cfg.cloud_storage_enabled || cfg.partition_autobalancing_mode == model::partition_autobalancing_mode::continuous - || has_gssapi() || has_oidc() || has_schma_id_validation() - || has_non_default_roles) { + || cfg.core_balancing_continuous() || has_gssapi() || has_oidc() + || has_schma_id_validation() || has_non_default_roles) { const auto& license = _feature_table.local().get_license(); if (!license || license->is_expired()) { vlog( diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index b52601c63442c..bc8cd8a6a86c3 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -19,18 +19,55 @@ namespace cluster { +namespace { + +const bytes& state_kvstore_key() { + static thread_local bytes key = []() { + iobuf buf; + serde::write(buf, shard_placement_kvstore_key_type::balancer_state); + return iobuf_to_bytes(buf); + }(); + return key; +} + +struct persisted_state + : serde:: + envelope, serde::compat_version<0>> { + uint32_t last_rebalance_core_count = 0; + + bool operator==(const persisted_state&) const = default; + auto serde_fields() { return std::tie(last_rebalance_core_count); } +}; + +} // namespace + shard_balancer::shard_balancer( ss::sharded& spt, ss::sharded& features, + ss::sharded& storage, ss::sharded& topics, - ss::sharded& cb) + ss::sharded& cb, + config::binding balancing_on_core_count_change, + config::binding balancing_continuous, + config::binding debounce_timeout) : _shard_placement(spt.local()) , _features(features.local()) + , _kvstore(storage.local().kvs()) , _topics(topics) , _controller_backend(cb) , _self(*config::node().node_id()) + , _balancing_on_core_count_change(std::move(balancing_on_core_count_change)) + , _balancing_continuous(std::move(balancing_continuous)) + , _debounce_timeout(std::move(debounce_timeout)) + , _debounce_jitter(_debounce_timeout()) + , _balance_timer([this] { balance_timer_callback(); }) , _total_counts(ss::smp::count, 0) { _total_counts.at(0) += 1; // controller partition + + _debounce_timeout.watch([this] { + _debounce_jitter = simple_time_jitter( + _debounce_timeout()); + }); } ss::future<> shard_balancer::start() { @@ -125,8 +162,15 @@ ss::future<> shard_balancer::start() { _to_assign.insert(ntp); } }); + co_await do_assign_ntps(lock); + if ( + _balancing_on_core_count_change() + && _features.is_active(features::feature::node_local_core_assignment)) { + co_await balance_on_core_count_change(lock); + } + vassert( tt_version == _topics.local().topics_map_revision(), "topic_table unexpectedly changed"); @@ -161,6 +205,7 @@ ss::future<> shard_balancer::stop() { shard_id); _topics.local().unregister_delta_notification(_topic_table_notify_handle); + _balance_timer.cancel(); _wakeup_event.set(); return _gate.close(); } @@ -218,6 +263,20 @@ shard_balancer::reassign_shard(model::ntp ntp, ss::shard_id shard) { co_return errc::success; } +errc shard_balancer::trigger_rebalance() { + if (_gate.is_closed()) { + return errc::shutting_down; + } + + if (!_features.is_active(features::feature::node_local_core_assignment)) { + return errc::feature_disabled; + } + + vlog(clusterlog.info, "triggering manual rebalancing"); + _balance_timer.rearm(ss::lowres_clock::now()); + return errc::success; +} + ss::future<> shard_balancer::assign_fiber() { if (_gate.is_closed()) { co_return; @@ -243,7 +302,7 @@ ss::future<> shard_balancer::do_assign_ntps(mutex::units& lock) { auto to_assign = std::exchange(_to_assign, {}); co_await ssx::async_for_each( to_assign.begin(), to_assign.end(), [&](const model::ntp& ntp) { - maybe_assign(ntp, new_targets); + maybe_assign(ntp, /*can_reassign=*/false, new_targets); }); co_await ss::max_concurrent_for_each( @@ -260,7 +319,7 @@ ss::future<> shard_balancer::do_assign_ntps(mutex::units& lock) { } void shard_balancer::maybe_assign( - const model::ntp& ntp, ntp2target_t& new_targets) { + const model::ntp& ntp, bool can_reassign, ntp2target_t& new_targets) { std::optional prev_target = _shard_placement.get_target(ntp); @@ -286,20 +345,44 @@ void shard_balancer::maybe_assign( _shard_placement.is_persistence_enabled(), "expected persistence to be enabled"); + std::optional prev_shard; if (prev_target && prev_target->log_revision == log_revision) { + prev_shard = prev_target->shard; + } + + if (prev_shard && !can_reassign) { // partition already assigned, keep current shard. return; } + auto new_shard = choose_shard(ntp, topic_data, prev_shard); + if (new_shard == prev_shard) { + return; + } + target.emplace( - replicas_view->assignment.group, - log_revision.value(), - choose_shard(ntp, topic_data, std::nullopt)); + replicas_view->assignment.group, log_revision.value(), new_shard); } else { // node-local shard placement not enabled yet, get target from // topic_table. target = placement_target_on_node(replicas_view.value(), _self); } + } else { + // partition is removed from this node, this will likely disrupt the + // counts balance, so we set up the balancing timer. + + if ( + _features.is_active(features::feature::node_local_core_assignment) + && _balancing_continuous() && !_balance_timer.armed()) { + // Add jitter so that different nodes don't move replicas of the + // same partition in unison. + auto debounce_interval = _debounce_jitter.next_duration(); + vlog( + clusterlog.info, + "scheduling balancing in {}s.", + debounce_interval / 1s); + _balance_timer.arm(debounce_interval); + } } vlog( @@ -315,6 +398,92 @@ void shard_balancer::maybe_assign( new_targets.emplace(ntp, target); } +ss::future<> shard_balancer::balance_on_core_count_change(mutex::units& lock) { + uint32_t last_rebalance_core_count = 0; + auto state_buf = _kvstore.get( + storage::kvstore::key_space::shard_placement, state_kvstore_key()); + if (state_buf) { + last_rebalance_core_count = serde::from_iobuf( + std::move(*state_buf)) + .last_rebalance_core_count; + } + + // If there is no state in kvstore, this means that we are restarting with + // shard balancing enabled for the first time, and this is a good time to + // rebalance as well. + + if (last_rebalance_core_count == ss::smp::count) { + co_return; + } + + vlog( + clusterlog.info, "detected core count change, triggering rebalance..."); + co_await do_balance(lock); +} + +void shard_balancer::balance_timer_callback() { + ssx::spawn_with_gate(_gate, [this] { + return _mtx.get_units() + .then([this](mutex::units lock) { + return ss::do_with(std::move(lock), [this](mutex::units& lock) { + return do_balance(lock); + }); + }) + .handle_exception([this](const std::exception_ptr& e) { + if (ssx::is_shutdown_exception(e)) { + return; + } + + // Retry balancing after some time. + if (!_balance_timer.armed()) { + _balance_timer.arm(_debounce_jitter.next_duration()); + } + vlog( + clusterlog.warn, + "failed to balance: {}, retrying after {}s.", + e, + (_balance_timer.get_timeout() - ss::lowres_clock::now()) / 1s); + }); + }); +} + +ss::future<> shard_balancer::do_balance(mutex::units& lock) { + // Go over all node-local ntps in random order and try to find a more + // optimal core for them. + chunked_vector ntps; + co_await _shard_placement.for_each_ntp( + [&](const model::ntp& ntp, const shard_placement_target&) { + ntps.push_back(ntp); + }); + std::shuffle(ntps.begin(), ntps.end(), random_generators::internal::gen); + + ntp2target_t new_targets; + co_await ssx::async_for_each( + ntps.begin(), ntps.end(), [&](const model::ntp& ntp) { + maybe_assign(ntp, /*can_reassign=*/true, new_targets); + }); + + vlog( + clusterlog.info, + "after balancing {} ntps were reassigned", + new_targets.size()); + + co_await ss::max_concurrent_for_each( + new_targets, + 128, + [this, &lock](const decltype(new_targets)::value_type& kv) { + const auto& [ntp, target] = kv; + return set_target(ntp, target, lock); + }); + + co_await _kvstore.put( + storage::kvstore::key_space::shard_placement, + state_kvstore_key(), + serde::to_iobuf(persisted_state{ + .last_rebalance_core_count = ss::smp::count, + })); +} + ss::future<> shard_balancer::set_target( const model::ntp& ntp, const std::optional& target, diff --git a/src/v/cluster/shard_balancer.h b/src/v/cluster/shard_balancer.h index 635228a6b57ab..80b6a8d0c5af7 100644 --- a/src/v/cluster/shard_balancer.h +++ b/src/v/cluster/shard_balancer.h @@ -14,6 +14,7 @@ #include "cluster/controller_backend.h" #include "cluster/shard_placement_table.h" #include "container/chunked_hash_map.h" +#include "random/simple_time_jitter.h" #include "ssx/event.h" #include "utils/mutex.h" @@ -36,8 +37,12 @@ class shard_balancer { shard_balancer( ss::sharded&, ss::sharded&, + ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + config::binding balancing_on_core_count_change, + config::binding balancing_continuous, + config::binding debounce_timeout); ss::future<> start(); ss::future<> stop(); @@ -50,14 +55,23 @@ class shard_balancer { /// Manually set shard placement for an ntp that has a replica on this node. ss::future reassign_shard(model::ntp, ss::shard_id); + /// Manually trigger shard placement rebalancing for partitions in this + /// node. + errc trigger_rebalance(); + private: void process_delta(const topic_table::delta&); ss::future<> assign_fiber(); ss::future<> do_assign_ntps(mutex::units& lock); + ss::future<> balance_on_core_count_change(mutex::units& lock); + void balance_timer_callback(); + ss::future<> do_balance(mutex::units& lock); + void maybe_assign( const model::ntp&, + bool can_reassign, chunked_hash_map>&); ss::future<> set_target( @@ -88,11 +102,18 @@ class shard_balancer { private: shard_placement_table& _shard_placement; features::feature_table& _features; + storage::kvstore& _kvstore; ss::sharded& _topics; ss::sharded& _controller_backend; model::node_id _self; + config::binding _balancing_on_core_count_change; + config::binding _balancing_continuous; + config::binding _debounce_timeout; + simple_time_jitter _debounce_jitter; + cluster::notification_id_type _topic_table_notify_handle; + ss::timer _balance_timer; ssx::event _wakeup_event{"shard_balancer"}; mutex _mtx{"shard_balancer"}; ss::gate _gate; diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index 8517c2d251a0c..bddc210e99170 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -101,12 +101,7 @@ namespace { static constexpr auto kvstore_key_space = storage::kvstore::key_space::shard_placement; -// enum type is irrelevant, serde will serialize to 32 bit anyway -enum class kvstore_key_type { - persistence_enabled = 0, - assignment = 1, - current_state = 2, -}; +using kvstore_key_type = shard_placement_kvstore_key_type; bytes persistence_enabled_kvstore_key() { iobuf buf; @@ -831,6 +826,26 @@ shard_placement_table::get_target(const model::ntp& ntp) const { return std::nullopt; } +ss::future<> shard_placement_table::for_each_ntp( + ss::noncopyable_function< + void(const model::ntp&, const shard_placement_target&)> func) const { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + return ssx::async_for_each( + _ntp2entry.begin(), + _ntp2entry.end(), + [&func](const decltype(_ntp2entry)::value_type& kv) { + const auto& [ntp, entry] = kv; + vassert( + entry && entry->target && entry->mtx.ready(), + "[{}]: unexpected concurrent set_target()", + ntp); + func(ntp, *entry->target); + }); +} + ss::future shard_placement_table::prepare_create( const model::ntp& ntp, model::revision_id expected_log_rev) { // ensure that there is no concurrent enable_persistence() call diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 9ae2f36b4bcac..10b21d7f4bde4 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -172,6 +172,12 @@ class shard_placement_table /// Must be called on assignment_shard_id. std::optional get_target(const model::ntp&) const; + /// Must be called on assignment_shard_id. Requires external synchronization + /// i.e. the assumption is that there are no concurrent set_target() calls. + ss::future<> + for_each_ntp(ss::noncopyable_function) const; + std::optional state_on_this_shard(const model::ntp&) const; const ntp2state_t& shard_local_states() const { return _states; } @@ -248,4 +254,14 @@ class shard_placement_table std::ostream& operator<<(std::ostream&, shard_placement_table::hosted_status); +/// Enum with all key types in the shard_placement key space. All keys in this +/// key space must be prefixed with the serialized type. Enum type is +/// irrelevant, as serde will serialize to 32 bit anyway. +enum class shard_placement_kvstore_key_type { + persistence_enabled = 0, + assignment = 1, + current_state = 2, + balancer_state = 3, +}; + } // namespace cluster diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 06172b8c450d6..e19738a5fc548 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -2035,4 +2035,10 @@ topics_frontend::set_local_partition_shard(model::ntp ntp, ss::shard_id shard) { }); } +ss::future topics_frontend::trigger_local_partition_shard_rebalance() { + return _shard_balancer.invoke_on( + shard_balancer::shard_id, + [](shard_balancer& sb) { return sb.trigger_rebalance(); }); +} + } // namespace cluster diff --git a/src/v/cluster/topics_frontend.h b/src/v/cluster/topics_frontend.h index aed246c241625..0f86d78f4bde0 100644 --- a/src/v/cluster/topics_frontend.h +++ b/src/v/cluster/topics_frontend.h @@ -220,6 +220,9 @@ class topics_frontend { /// shard. ss::future set_local_partition_shard(model::ntp, ss::shard_id); + /// Trigger shard placement rebalancing for partitions in this node. + ss::future trigger_local_partition_shard_rebalance(); + private: using ntp_leader = std::pair; diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index a81fc6311a266..1176d5ccf625e 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2776,6 +2776,28 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 512, {.min = 1, .max = 2048}) + , core_balancing_on_core_count_change( + *this, + "core_balancing_on_core_count_change", + "If set to 'true', and if after a restart the number of cores changes, " + "Redpanda will move partitions between cores to maintain balanced " + "partition distribution.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + true) + , core_balancing_continuous( + *this, + "core_balancing_continuous", + "If set to 'true', move partitions between cores in runtime to maintain " + "balanced partition distribution.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + false) + , core_balancing_debounce_timeout( + *this, + "core_balancing_debounce_timeout", + "Interval, in milliseconds, between trigger and invocation of core " + "balancing.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 10s) , internal_topic_replication_factor( *this, "internal_topic_replication_factor", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 9a252ccc93dc0..15923d6b0d1ab 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -498,6 +498,11 @@ struct configuration final : public config_store { property leader_balancer_mute_timeout; property leader_balancer_node_mute_timeout; bounded_property leader_balancer_transfer_limit_per_shard; + + property core_balancing_on_core_count_change; + property core_balancing_continuous; + property core_balancing_debounce_timeout; + property internal_topic_replication_factor; property health_manager_tick_interval; diff --git a/src/v/redpanda/admin/api-doc/partition.json b/src/v/redpanda/admin/api-doc/partition.json index cd6377a7d0d98..d8e83f8393d8c 100644 --- a/src/v/redpanda/admin/api-doc/partition.json +++ b/src/v/redpanda/admin/api-doc/partition.json @@ -299,6 +299,21 @@ } ] }, + { + "path": "/v1/partitions/rebalance_cores", + "operations": [ + { + "method": "POST", + "summary": "Trigger core placement rebalancing for partitions in this broker.", + "type": "void", + "nickname": "trigger_partitions_shard_rebalance", + "produces": [ + "application/json" + ], + "parameters": [] + } + ] + }, { "path": "/v1/partitions/{namespace}/{topic}/{partition}/unclean_abort_reconfiguration", "operations": [ diff --git a/src/v/redpanda/admin/partition.cc b/src/v/redpanda/admin/partition.cc index 6c0d392ae961d..21a29d5114140 100644 --- a/src/v/redpanda/admin/partition.cc +++ b/src/v/redpanda/admin/partition.cc @@ -807,6 +807,12 @@ void admin_server::register_partition_routes() { return trigger_on_demand_rebalance_handler(std::move(req)); }); + register_route( + ss::httpd::partition_json::trigger_partitions_shard_rebalance, + [this](std::unique_ptr req) { + return trigger_shard_rebalance_handler(std::move(req)); + }); + register_route( ss::httpd::partition_json::get_partition_reconfigurations, [this](std::unique_ptr req) { @@ -1324,3 +1330,14 @@ admin_server::trigger_on_demand_rebalance_handler( co_await throw_on_error(*req, ec, model::controller_ntp); co_return ss::json::json_return_type(ss::json::json_void()); } + +ss::future +admin_server::trigger_shard_rebalance_handler( + std::unique_ptr req) { + auto ec = co_await _controller->get_topics_frontend() + .local() + .trigger_local_partition_shard_rebalance(); + + co_await throw_on_error(*req, ec, model::ntp{}); + co_return ss::json::json_return_type(ss::json::json_void()); +} diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index 06809880a3ad6..2ab8e913d7422 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -532,6 +532,8 @@ class admin_server { ss::future trigger_on_demand_rebalance_handler(std::unique_ptr); + ss::future + trigger_shard_rebalance_handler(std::unique_ptr); ss::future get_reconfigurations_handler(std::unique_ptr); diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 5c6919663400f..d3d80d9ef0aa4 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -813,6 +813,14 @@ def trigger_rebalance(self, node=None): return self._request('post', path, node=node) + def trigger_cores_rebalance(self, node): + """ + Trigger core placement rebalancing for partitions in this node. + """ + path = f"partitions/rebalance_cores" + + return self._request('post', path, node=node) + def list_reconfigurations(self, node=None): """ List pending reconfigurations diff --git a/tests/rptest/tests/shard_placement_test.py b/tests/rptest/tests/shard_placement_test.py index 93dc6d2958325..687d465ea979b 100644 --- a/tests/rptest/tests/shard_placement_test.py +++ b/tests/rptest/tests/shard_placement_test.py @@ -10,20 +10,70 @@ from ducktape.utils.util import wait_until from rptest.services.cluster import cluster +from rptest.services.redpanda import ResourceSettings from rptest.services.admin import Admin +import rptest.services.kgo_verifier_services as kgo from rptest.clients.rpk import RpkTool -from rptest.tests.redpanda_test import RedpandaTest +from rptest.tests.prealloc_nodes import PreallocNodesTest from rptest.services.redpanda_installer import RedpandaInstaller +from rptest.util import wait_until_result -class ShardPlacementTest(RedpandaTest): +class ShardPlacementTest(PreallocNodesTest): def __init__(self, *args, **kwargs): - super().__init__(*args, num_brokers=5, **kwargs) + super().__init__(*args, num_brokers=5, node_prealloc_count=1, **kwargs) def setUp(self): # start the nodes manually pass + def start_client_load(self, topic_name): + msg_size = 4096 + + if self.redpanda.dedicated_nodes: + rate_limit_bps = 100 * 2**20 + elif not self.debug_mode: + rate_limit_bps = 10 * 2**20 + else: + rate_limit_bps = 100 * 2**10 + + self.producer = kgo.KgoVerifierProducer( + self.test_context, + self.redpanda, + topic=topic_name, + msg_size=msg_size, + # some large number to get produce load till the end of test + msg_count=2**30, + rate_limit_bps=rate_limit_bps, + custom_node=self.preallocated_nodes) + self.producer.start(clean=False) + self.producer.wait_for_acks(10, timeout_sec=30, backoff_sec=1) + + self.consumer = kgo.KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + topic=topic_name, + msg_size=msg_size, + readers=5, + loop=True, + nodes=self.preallocated_nodes, + debug_logs=True) + self.consumer.start(clean=False) + self.consumer.wait_total_reads(10, timeout_sec=30, backoff_sec=1) + + def stop_client_load(self): + self.producer.stop() + self.consumer.wait_total_reads(self.producer.produce_status.acked, + timeout_sec=60, + backoff_sec=1) + self.consumer.stop() + + self.logger.info( + f"produced {self.producer.produce_status.acked} msgs, " + f"consumed {self.consumer.consumer_status.validator.valid_reads}") + assert self.consumer.consumer_status.validator.invalid_reads == 0 + assert self.consumer.consumer_status.validator.out_of_scope_invalid_reads == 0 + def get_replica_shard_map(self, nodes, admin=None): """Return map of topic -> partition -> [(node_id, core)]""" @@ -44,6 +94,10 @@ def get_replica_shard_map(self, nodes, admin=None): # sort replicas for the ease of comparison replicas.sort() + for topic, partitions in sorted(topic2partition2shard.items()): + for p, replicas in sorted(partitions.items()): + self.logger.debug(f"ntp: {topic}/{p} replicas: {replicas}") + return topic2partition2shard def get_shard_counts_by_topic(self, shard_map, node_id): @@ -58,27 +112,23 @@ def get_shard_counts_by_topic(self, shard_map, node_id): list(0 for _ in range(core_count)))[core] += 1 return topic2shard2count - def debug_print_shard_map(self, shard_map, with_counts=True): + def print_shard_stats(self, shard_map): node_ids = set() - for topic, partitions in sorted(shard_map.items()): - for p, replicas in sorted(partitions.items()): - self.logger.debug(f"ntp: {topic}/{p} replicas: {replicas}") + for partitions in shard_map.values(): + for replicas in partitions.values(): for n_id, _ in replicas: node_ids.add(n_id) - if not with_counts: - return - core_count = self.redpanda.get_node_cpu_count() for node_id in sorted(node_ids): shard_counts = self.get_shard_counts_by_topic(shard_map, node_id) total_counts = list(0 for _ in range(core_count)) - self.logger.debug(f"shard replica counts on node {node_id}:") + self.logger.info(f"shard replica counts on node {node_id}:") for t, counts in sorted(shard_counts.items()): - self.logger.debug(f"topic {t}: {counts}") + self.logger.info(f"topic {t}: {counts}") for i, c in enumerate(counts): total_counts[i] += c - self.logger.debug(f"total: {total_counts}") + self.logger.info(f"total: {total_counts}") def wait_shard_map_stationary(self, nodes, @@ -90,7 +140,6 @@ def wait_shard_map_stationary(self, def is_stationary(): nonlocal shard_map new_map = self.get_replica_shard_map(nodes, admin) - self.debug_print_shard_map(new_map, with_counts=False) if new_map == shard_map: return True else: @@ -101,7 +150,7 @@ def is_stationary(): backoff_sec=backoff_sec) return shard_map - @cluster(num_nodes=5) + @cluster(num_nodes=6) def test_upgrade(self): # Disable partition balancer in this test, as we need partitions # to remain stationary. @@ -126,9 +175,11 @@ def test_upgrade(self): for topic in ["foo", "bar"]: rpk.create_topic(topic, partitions=n_partitions, replicas=3) + self.start_client_load("foo") + self.logger.info("created cluster and topics.") initial_map = self.get_replica_shard_map(seed_nodes, admin) - self.debug_print_shard_map(initial_map) + self.print_shard_stats(initial_map) # Upgrade the cluster and enable the feature. @@ -149,7 +200,7 @@ def test_upgrade(self): self.logger.info( "feature enabled, checking that shard map is stable...") map_after_upgrade = self.get_replica_shard_map(seed_nodes, admin) - self.debug_print_shard_map(map_after_upgrade) + self.print_shard_stats(map_after_upgrade) assert map_after_upgrade == initial_map # Manually move replicas of one topic on one node to shard 0 @@ -168,7 +219,7 @@ def test_upgrade(self): "checking shard map...") map_after_manual_move = self.wait_shard_map_stationary( seed_nodes, admin) - self.debug_print_shard_map(map_after_manual_move) + self.print_shard_stats(map_after_manual_move) foo_shard_counts = self.get_shard_counts_by_topic( map_after_manual_move, moved_replica_id)["foo"] assert foo_shard_counts[0] == n_partitions @@ -184,7 +235,7 @@ def test_upgrade(self): # check that shard counts are balanced self.logger.info(f"added 2 nodes and a topic, checking shard map...") map_after_join = self.get_replica_shard_map(joiner_nodes, admin) - self.debug_print_shard_map(map_after_join) + self.print_shard_stats(map_after_join) for joiner in joiner_nodes: joiner_id = self.redpanda.node_id(joiner) shard_counts = self.get_shard_counts_by_topic( @@ -210,16 +261,21 @@ def test_upgrade(self): "checking shard map...") map_after_manual_move2 = self.wait_shard_map_stationary( joiner_nodes, admin) - self.debug_print_shard_map(map_after_manual_move2) + self.print_shard_stats(map_after_manual_move2) quux_shard_counts = self.get_shard_counts_by_topic( map_after_manual_move2, joiner_id)["quux"] assert quux_shard_counts[0] > 0 assert sum(quux_shard_counts) == quux_shard_counts[0] # Restart and check that the shard map remains stable + # (if we turn off rebalancing on startup) + + self.redpanda.set_cluster_config( + {"core_balancing_on_core_count_change": False}) map_before_restart = self.get_replica_shard_map( self.redpanda.nodes, admin) + self.print_shard_stats(map_before_restart) self.redpanda.restart_nodes(self.redpanda.nodes) self.redpanda.wait_for_membership(first_start=False) @@ -227,5 +283,191 @@ def test_upgrade(self): self.logger.info("restarted cluster, checking shard map...") map_after_restart = self.get_replica_shard_map(self.redpanda.nodes, admin) - self.debug_print_shard_map(map_after_restart) + self.print_shard_stats(map_after_restart) assert map_after_restart == map_before_restart + + self.stop_client_load() + + @cluster(num_nodes=6) + def test_manual_rebalance(self): + self.redpanda.start() + + admin = Admin(self.redpanda) + rpk = RpkTool(self.redpanda) + + n_partitions = 10 + + for topic in ["foo", "bar"]: + rpk.create_topic(topic, partitions=n_partitions, replicas=5) + + self.start_client_load("foo") + + # Manually move some partitions to create artificial imbalance + + node = self.redpanda.nodes[0] + moved_replica_id = self.redpanda.node_id(node) + + core_count = self.redpanda.get_node_cpu_count() + for p in range(n_partitions): + admin.set_partition_replica_core(topic="foo", + partition=p, + replica=moved_replica_id, + core=0) + admin.set_partition_replica_core(topic="bar", + partition=p, + replica=moved_replica_id, + core=core_count - 1) + + self.logger.info( + f"manually moved some replicas on node {moved_replica_id}, " + "checking shard map...") + shard_map = self.wait_shard_map_stationary(self.redpanda.nodes, admin) + self.print_shard_stats(shard_map) + counts_by_topic = self.get_shard_counts_by_topic( + shard_map, moved_replica_id) + assert counts_by_topic["foo"][0] == n_partitions + assert sum(counts_by_topic["foo"]) == n_partitions + assert counts_by_topic["bar"][core_count - 1] == n_partitions + assert sum(counts_by_topic["bar"]) == n_partitions + + admin.trigger_cores_rebalance(node) + self.logger.info( + f"trigger manual shard rebalance on node {node.name} (id: {moved_replica_id})" + ", checking shard map...") + shard_map = self.wait_shard_map_stationary(self.redpanda.nodes, admin) + self.print_shard_stats(shard_map) + counts_by_topic = self.get_shard_counts_by_topic( + shard_map, moved_replica_id) + for topic, shard_counts in counts_by_topic.items(): + assert max(shard_counts) - min(shard_counts) <= 1 + + self.stop_client_load() + + @cluster(num_nodes=6) + def test_core_count_change(self): + self.redpanda.set_resource_settings(ResourceSettings(num_cpus=1)) + self.redpanda.start() + + admin = Admin(self.redpanda) + rpk = RpkTool(self.redpanda) + + n_partitions = 10 + + for topic in ["foo", "bar"]: + # create topics with rf=5 for ease of accounting + rpk.create_topic(topic, partitions=n_partitions, replicas=5) + + self.start_client_load("foo") + + # increase cpu count on one node, restart it and + # check that new shards are in use. + self.logger.info("increasing cpu count and restarting...") + + node = self.redpanda.nodes[0] + node_id = self.redpanda.node_id(node) + self.redpanda.stop_node(node) + self.redpanda.set_resource_settings(ResourceSettings(num_cpus=2)) + self.redpanda.start_node(node) + self.redpanda.wait_for_membership(first_start=False) + + # check that the node moved partitions to the new core + shard_map = self.get_replica_shard_map([node], admin) + self.print_shard_stats(shard_map) + counts_by_topic = self.get_shard_counts_by_topic(shard_map, node_id) + assert len(counts_by_topic) > 0 + for topic, shard_counts in counts_by_topic.items(): + assert max(shard_counts) - min(shard_counts) <= 1 + + # do some manual moves and check that their effects remain + # if the core count doesn't change. + self.logger.info("doing some manual moves...") + + foo_partitions_on_node = [ + p for p, rs in shard_map["foo"].items() + if any(n == node_id for n, _ in rs) + ] + for p in foo_partitions_on_node: + admin.set_partition_replica_core(topic="foo", + partition=p, + replica=node_id, + core=0) + shard_map = self.wait_shard_map_stationary([node], admin) + self.print_shard_stats(shard_map) + + self.logger.info( + "restarting and checking manual assignments are still there...") + + self.redpanda.restart_nodes([node]) + self.redpanda.wait_for_membership(first_start=False) + + map_after_restart = self.get_replica_shard_map([node], admin) + self.print_shard_stats(map_after_restart) + assert map_after_restart == shard_map + + self.stop_client_load() + + # TODO: core count decrease (not supported yet) + + @cluster(num_nodes=6) + def test_node_join(self): + self.redpanda.add_extra_rp_conf({ + "core_balancing_continuous": True, + }) + seed_nodes = self.redpanda.nodes[0:3] + joiner_nodes = self.redpanda.nodes[3:] + self.redpanda.start(nodes=seed_nodes) + + admin = Admin(self.redpanda, default_node=seed_nodes[0]) + rpk = RpkTool(self.redpanda) + + n_partitions = 10 + + topics = ["foo", "bar", "quux"] + for topic in topics: + rpk.create_topic(topic, partitions=n_partitions, replicas=3) + + self.start_client_load("foo") + + self.logger.info(f"created topics: {topics}") + initial_shard_map = self.wait_shard_map_stationary(seed_nodes, admin) + self.print_shard_stats(initial_shard_map) + + self.redpanda.start(nodes=joiner_nodes) + + def node_rebalance_finished(): + in_progress = admin.list_reconfigurations(node=seed_nodes[0]) + if len(in_progress) > 0: + return False + + for n in joiner_nodes: + num_partitions = len(admin.get_partitions(node=n)) + if num_partitions < 5: + return False + + return True + + wait_until(node_rebalance_finished, timeout_sec=60, backoff_sec=2) + self.logger.info("node rebalance finished") + + def shard_rebalance_finished(): + nodes = self.redpanda.nodes + shard_map = self.get_replica_shard_map(nodes, admin) + self.print_shard_stats(shard_map) + for n in nodes: + node_id = self.redpanda.node_id(n) + shard_counts = self.get_shard_counts_by_topic( + shard_map, node_id) + for topic in topics: + topic_counts = shard_counts[topic] + if max(topic_counts) - min(topic_counts) > 1: + return False + + return (True, shard_map) + + shard_map_after_balance = wait_until_result(shard_rebalance_finished, + timeout_sec=60, + backoff_sec=2) + self.logger.info("shard rebalance finished") + self.print_shard_stats(shard_map_after_balance) + + self.stop_client_load() diff --git a/tests/rptest/tests/topic_delete_test.py b/tests/rptest/tests/topic_delete_test.py index fe545fc8be0b0..7ddf0b7cf0949 100644 --- a/tests/rptest/tests/topic_delete_test.py +++ b/tests/rptest/tests/topic_delete_test.py @@ -73,7 +73,9 @@ def get_kvstore_topic_key_counts(redpanda): keys = [i['key'] for i in items] for k in keys: - if k['keyspace'] == "cluster" or k['keyspace'] == "usage": + if (k['keyspace'] == "cluster" or k['keyspace'] == "usage" + or (k['keyspace'] == "shard_placement" + and k['data']['type'] in (0, 3))): # Not a per-partition key continue diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 5fe53bd63b62d..0c98d5ce68e53 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -273,6 +273,8 @@ def decode_shard_placement_key(k): elif ret['type'] == 2: ret['name'] = "current_state" ret['group'] = rdr.read_int64() + elif ret['type'] == 3: + ret['name'] = "balancer_state" else: ret['name'] = "unknown" return ret