From 6bcdd94c48bcc98c8241c53844419331710dcdb7 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 31 May 2023 16:35:52 +0300 Subject: [PATCH 1/5] c/allocation_state: maintain final partition counts When moving partitions, partition allocations on old nodes are not removed until the move is finished. This makes sense because partitions remain physically present on old nodes until the very end. But we also use these allocation counts for count-based rebalancing. This is incorrect because after all moves are finished, the distribution won't necessary be uniform. To fix this, introduce "final counts" - the number of partitions on a node after all currently in-progress moves are finished and maintain it when (re)allocating partitions and in topic_updates_dispatcher. --- src/v/cluster/scheduling/allocation_node.cc | 2 + src/v/cluster/scheduling/allocation_node.h | 17 +++++ src/v/cluster/scheduling/allocation_state.cc | 21 ++++++ src/v/cluster/scheduling/allocation_state.h | 4 ++ .../cluster/scheduling/partition_allocator.cc | 36 ++++++++++ .../cluster/scheduling/partition_allocator.h | 5 ++ src/v/cluster/scheduling/types.cc | 19 ++++- .../backend_reallocation_strategy_test.cc | 4 ++ src/v/cluster/topic_updates_dispatcher.cc | 70 ++++++++++++++++--- src/v/cluster/topic_updates_dispatcher.h | 5 ++ 10 files changed, 172 insertions(+), 11 deletions(-) diff --git a/src/v/cluster/scheduling/allocation_node.cc b/src/v/cluster/scheduling/allocation_node.cc index 505058caeeb8..63283d677fa3 100644 --- a/src/v/cluster/scheduling/allocation_node.cc +++ b/src/v/cluster/scheduling/allocation_node.cc @@ -54,6 +54,8 @@ allocation_node::allocate(const partition_allocation_domain domain) { (*it)++; // increment the weights _allocated_partitions++; ++_allocated_domain_partitions[domain]; + _final_partitions++; + ++_final_domain_partitions[domain]; const ss::shard_id core = std::distance(_weights.begin(), it); vlog( clusterlog.trace, diff --git a/src/v/cluster/scheduling/allocation_node.h b/src/v/cluster/scheduling/allocation_node.h index 82351bd45eef..7e783d58a12a 100644 --- a/src/v/cluster/scheduling/allocation_node.h +++ b/src/v/cluster/scheduling/allocation_node.h @@ -99,6 +99,9 @@ class allocation_node { return _allocated_partitions; } + // number of partitions after all in-progress movements are finished + allocation_capacity final_partitions() const { return _final_partitions; } + allocation_capacity domain_allocated_partitions(partition_allocation_domain domain) const { if (auto it = _allocated_domain_partitions.find(domain); @@ -108,6 +111,15 @@ class allocation_node { return allocation_capacity{0}; } + allocation_capacity + domain_final_partitions(partition_allocation_domain domain) const { + if (auto it = _final_domain_partitions.find(domain); + it != _final_domain_partitions.end()) { + return it->second; + } + return allocation_capacity{0}; + } + bool empty() const { return _allocated_partitions == allocation_capacity{0}; } @@ -129,6 +141,11 @@ class allocation_node { // number of partitions allocated in a specific allocation domain absl::flat_hash_map _allocated_domain_partitions; + // number of partitions after all movements are finished + allocation_capacity _final_partitions{0}; + absl::flat_hash_map + _final_domain_partitions; + state _state = state::active; config::binding _partitions_per_shard; diff --git a/src/v/cluster/scheduling/allocation_state.cc b/src/v/cluster/scheduling/allocation_state.cc index 7d94ae261d3f..30f84bebc330 100644 --- a/src/v/cluster/scheduling/allocation_state.cc +++ b/src/v/cluster/scheduling/allocation_state.cc @@ -22,6 +22,7 @@ void allocation_state::rollback( for (auto& as : v) { for (auto& bs : as.replicas) { remove_allocation(bs, domain); + remove_final_count(bs, domain); } // rollback for each assignment as the groups are distinct _highest_group = raft::group_id(_highest_group() - 1); @@ -190,6 +191,26 @@ uint32_t allocation_state::allocate( return it->second->allocate(domain); } +void allocation_state::add_final_count( + const model::broker_shard& replica, + const partition_allocation_domain domain) { + verify_shard(); + if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) { + ++it->second->_final_partitions; + ++it->second->_final_domain_partitions[domain]; + } +} + +void allocation_state::remove_final_count( + const model::broker_shard& replica, + const partition_allocation_domain domain) { + verify_shard(); + if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) { + --it->second->_final_partitions; + --it->second->_final_domain_partitions[domain]; + } +} + void allocation_state::verify_shard() const { /* This is a consistency check on the use of the allocation state: * it checks that the caller is on the same shard the state was originally diff --git a/src/v/cluster/scheduling/allocation_state.h b/src/v/cluster/scheduling/allocation_state.h index d9f683472b17..5d54df1a7a11 100644 --- a/src/v/cluster/scheduling/allocation_state.h +++ b/src/v/cluster/scheduling/allocation_state.h @@ -57,6 +57,10 @@ class allocation_state : public ss::weakly_referencable { add_allocation(const model::broker_shard&, partition_allocation_domain); void remove_allocation(const model::broker_shard&, partition_allocation_domain); + void + add_final_count(const model::broker_shard&, partition_allocation_domain); + void + remove_final_count(const model::broker_shard&, partition_allocation_domain); void rollback( const ss::chunked_fifo& pa, diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index c23b4c9048f4..f431713904ff 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -406,6 +406,22 @@ void partition_allocator::remove_allocations( } } +void partition_allocator::add_final_counts( + const std::vector& to_add, + const partition_allocation_domain domain) { + for (const auto& bs : to_add) { + _state->add_final_count(bs, domain); + } +} + +void partition_allocator::remove_final_counts( + const std::vector& to_remove, + const partition_allocation_domain domain) { + for (const auto& bs : to_remove) { + _state->remove_final_count(bs, domain); + } +} + ss::future<> partition_allocator::apply_snapshot(const controller_snapshot& snap) { auto new_state = std::make_unique( @@ -439,6 +455,8 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) { new_state->add_allocation(bs, domain); } + const std::vector* final_replicas = nullptr; + if (auto it = topic.updates.find(p_id); it != topic.updates.end()) { const auto& update = it->second; // Both old and new replicas contribute to allocator weights @@ -448,6 +466,24 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) { for (const auto& bs : additional_replicas) { new_state->add_allocation(bs, domain); } + + // final counts depend on the update state + switch (update.state) { + case reconfiguration_state::in_progress: + case reconfiguration_state::force_update: + final_replicas = &update.target_assignment; + break; + case reconfiguration_state::cancelled: + case reconfiguration_state::force_cancelled: + final_replicas = &partition.replicas; + break; + } + } else { + final_replicas = &partition.replicas; + } + + for (const auto& bs : *final_replicas) { + new_state->add_final_count(bs, domain); } co_await ss::coroutine::maybe_yield(); diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 175fb9036eb5..56ff72337ebc 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -112,12 +112,17 @@ class partition_allocator { const std::vector&, partition_allocation_domain); void remove_allocations( const std::vector&, partition_allocation_domain); + void add_final_counts( + const std::vector&, partition_allocation_domain); + void remove_final_counts( + const std::vector&, partition_allocation_domain); void add_allocations_for_new_partition( const std::vector& replicas, raft::group_id group_id, partition_allocation_domain domain) { add_allocations(replicas, domain); + add_final_counts(replicas, domain); _state->update_highest_group_id(group_id); } diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 151a082e981e..377287152b97 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -79,6 +79,7 @@ allocation_units::~allocation_units() { for (auto& pas : _assignments) { for (auto& replica : pas.replicas) { _state->remove_allocation(replica, _domain); + _state->remove_final_count(replica, _domain); } } } @@ -136,9 +137,11 @@ allocated_partition::prepare_move(model::node_id prev_node) { std::swap(_replicas[prev.idx], _replicas.back()); _replicas.pop_back(); _state->remove_allocation(prev.bs, _domain); + _state->remove_final_count(prev.bs, _domain); if (prev.original) { _state->remove_allocation(*prev.original, _domain); } + return prev; } @@ -164,6 +167,7 @@ model::broker_shard allocated_partition::add_replica( it != _original_node2shard->end()) { // this is an original replica, preserve the shard replica.shard = it->second; + _state->add_final_count(replica, _domain); } else { // the replica is new, choose the shard and add allocation replica.shard = _state->allocate(node, _domain); @@ -178,6 +182,7 @@ void allocated_partition::cancel_move(const previous_replica& prev) { _replicas.push_back(prev.bs); std::swap(_replicas[prev.idx], _replicas.back()); _state->add_allocation(prev.bs, _domain); + _state->add_final_count(prev.bs, _domain); if (prev.original) { _state->add_allocation(*prev.original, _domain); } @@ -226,10 +231,22 @@ allocated_partition::~allocated_partition() { } for (const auto& bs : _replicas) { - if (!_original_node2shard->contains(bs.node_id)) { + auto orig_it = _original_node2shard->find(bs.node_id); + if (orig_it == _original_node2shard->end()) { + // new replica _state->remove_allocation(bs, _domain); + _state->remove_final_count(bs, _domain); + } else { + // original replica that didn't change, erase from the map in + // preparation for the loop below + _original_node2shard->erase(orig_it); } } + + for (const auto& kv : *_original_node2shard) { + model::broker_shard bs{kv.first, kv.second}; + _state->add_final_count(bs, _domain); + } } partition_constraints::partition_constraints( diff --git a/src/v/cluster/tests/backend_reallocation_strategy_test.cc b/src/v/cluster/tests/backend_reallocation_strategy_test.cc index 3303008ea743..a50564d766c1 100644 --- a/src/v/cluster/tests/backend_reallocation_strategy_test.cc +++ b/src/v/cluster/tests/backend_reallocation_strategy_test.cc @@ -217,8 +217,12 @@ struct strategy_test_fixture { // update allocator allocator.add_allocations( added, cluster::partition_allocation_domains::common); + allocator.add_final_counts( + added, cluster::partition_allocation_domains::common); allocator.remove_allocations( removed, cluster::partition_allocation_domains::common); + allocator.remove_final_counts( + removed, cluster::partition_allocation_domains::common); auto ec = co_await topics.apply( cluster::move_partition_replicas_cmd(ntp, pr.new_replica_set), diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index 7aecf06da894..25c02ed3cd4f 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -170,9 +170,9 @@ ss::future topic_updates_dispatcher::apply( "Partition {} have to exist before successful " "partition reallocation", ntp); - auto to_add = subtract_replica_sets(cmd.value, p_as->replicas); - _partition_allocator.local().add_allocations( - to_add, get_allocation_domain(ntp)); + + update_allocations_for_reconfiguration( + p_as->replicas, cmd.value, get_allocation_domain(ntp)); _partition_balancer_state.local().handle_ntp_update( ntp.ns, ntp.tp.topic, ntp.tp.partition, p_as->replicas, cmd.value); @@ -202,6 +202,16 @@ ss::future topic_updates_dispatcher::apply( "currently being updated", ntp); + auto to_add = subtract_replica_sets( + *new_target_replicas, current_assignment->replicas); + _partition_allocator.local().add_final_counts( + to_add, get_allocation_domain(ntp)); + + auto to_remove = subtract_replica_sets( + current_assignment->replicas, *new_target_replicas); + _partition_allocator.local().remove_final_counts( + to_remove, get_allocation_domain(ntp)); + _partition_balancer_state.local().handle_ntp_update( ntp.ns, ntp.tp.topic, @@ -345,10 +355,10 @@ ss::future topic_updates_dispatcher::apply( if (assigment_it == assignments.value().end()) { co_return std::error_code(errc::partition_not_exists); } - auto to_add = subtract_replica_sets( - replicas, assigment_it->replicas); - _partition_allocator.local().add_allocations( - to_add, get_allocation_domain(ntp)); + + update_allocations_for_reconfiguration( + assigment_it->replicas, replicas, get_allocation_domain(ntp)); + _partition_balancer_state.local().handle_ntp_update( ntp.ns, ntp.tp.topic, @@ -406,10 +416,17 @@ ss::future topic_updates_dispatcher::apply( "currently being cancelled", ntp); + auto to_add = subtract_replica_sets( + *target_replicas, *previous_replicas); + _partition_allocator.local().add_final_counts( + to_add, get_allocation_domain(ntp)); + auto to_delete = subtract_replica_sets( *previous_replicas, *target_replicas); _partition_allocator.local().remove_allocations( to_delete, get_allocation_domain(ntp)); + _partition_allocator.local().remove_final_counts( + to_delete, get_allocation_domain(ntp)); _partition_balancer_state.local().handle_ntp_update( ntp.ns, @@ -423,9 +440,29 @@ ss::future topic_updates_dispatcher::apply( ss::future topic_updates_dispatcher::apply( force_partition_reconfiguration_cmd cmd, model::offset base_offset) { - // Post dispatch, allocator updates are skipped because the target - // replica set is a subset of the original replica set. - return dispatch_updates_to_cores(std::move(cmd), base_offset); + auto p_as = _topic_table.local().get_partition_assignment(cmd.key); + auto ec = co_await dispatch_updates_to_cores(cmd, base_offset); + if (ec) { + co_return ec; + } + + const auto& ntp = cmd.key; + vassert( + p_as.has_value(), + "Partition {} have to exist before successful force-reconfiguration", + ntp); + + update_allocations_for_reconfiguration( + p_as->replicas, cmd.value.replicas, get_allocation_domain(ntp)); + + _partition_balancer_state.local().handle_ntp_update( + ntp.ns, + ntp.tp.topic, + ntp.tp.partition, + p_as->replicas, + cmd.value.replicas); + + co_return ec; } topic_updates_dispatcher::in_progress_map @@ -520,6 +557,7 @@ void topic_updates_dispatcher::deallocate_topic( ? p_as.replicas : union_replica_sets(it->second, p_as.replicas); _partition_allocator.local().remove_allocations(to_delete, domain); + _partition_allocator.local().remove_final_counts(p_as.replicas, domain); if (unlikely(clusterlog.is_enabled(ss::log_level::trace))) { vlog( clusterlog.trace, @@ -541,6 +579,18 @@ void topic_updates_dispatcher::add_allocations_for_new_partitions( } } +void topic_updates_dispatcher::update_allocations_for_reconfiguration( + const std::vector& previous, + const std::vector& target, + partition_allocation_domain domain) { + auto to_add = subtract_replica_sets(target, previous); + _partition_allocator.local().add_allocations(to_add, domain); + _partition_allocator.local().add_final_counts(to_add, domain); + + auto to_remove = subtract_replica_sets(previous, target); + _partition_allocator.local().remove_final_counts(to_remove, domain); +} + ss::future<> topic_updates_dispatcher::fill_snapshot(controller_snapshot& snap) const { co_await _topic_table.local().fill_snapshot(snap); diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index 3f59e4aa1ac2..4b7fc0089c93 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -115,6 +115,11 @@ class topic_updates_dispatcher { void add_allocations_for_new_partitions(const T&, partition_allocation_domain); + void update_allocations_for_reconfiguration( + const std::vector& previous, + const std::vector& target, + partition_allocation_domain); + void deallocate_topic( const model::topic_namespace&, const assignments_set&, From 992a365be54a051e0701fe9e9603ada8ebaa43d4 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 1 Jun 2023 16:37:48 +0300 Subject: [PATCH 2/5] c/topic_updates_dispatcher: add allocator partition counts utest --- .../tests/topic_updates_dispatcher_test.cc | 212 ++++++++++++++++-- 1 file changed, 199 insertions(+), 13 deletions(-) diff --git a/src/v/cluster/tests/topic_updates_dispatcher_test.cc b/src/v/cluster/tests/topic_updates_dispatcher_test.cc index ebe72f06b9f5..73055a72f9ca 100644 --- a/src/v/cluster/tests/topic_updates_dispatcher_test.cc +++ b/src/v/cluster/tests/topic_updates_dispatcher_test.cc @@ -19,10 +19,19 @@ #include using namespace std::chrono_literals; +ss::logger logger{"dispatcher_test"}; + struct topic_table_updates_dispatcher_fixture : topic_table_fixture { topic_table_updates_dispatcher_fixture() : dispatcher(allocator, table, leaders, pb_state) {} + template + void dispatch_command(Cmd cmd) { + auto res + = dispatcher.apply_update(serde_serialize_cmd(std::move(cmd))).get(); + BOOST_REQUIRE_EQUAL(res, cluster::errc::success); + } + void create_topics() { auto cmd_1 = make_create_topic_cmd("test_tp_1", 1, 3); cmd_1.value.cfg.properties.compaction_strategy @@ -37,19 +46,9 @@ struct topic_table_updates_dispatcher_fixture : topic_table_fixture { auto cmd_2 = make_create_topic_cmd("test_tp_2", 12, 3); auto cmd_3 = make_create_topic_cmd("test_tp_3", 8, 1); - auto res_1 = dispatcher - .apply_update(serialize_cmd(std::move(cmd_1)).get0()) - .get0(); - auto res_2 = dispatcher - .apply_update(serialize_cmd(std::move(cmd_2)).get0()) - .get0(); - auto res_3 = dispatcher - .apply_update(serialize_cmd(std::move(cmd_3)).get0()) - .get0(); - - BOOST_REQUIRE_EQUAL(res_1, cluster::errc::success); - BOOST_REQUIRE_EQUAL(res_2, cluster::errc::success); - BOOST_REQUIRE_EQUAL(res_3, cluster::errc::success); + dispatch_command(std::move(cmd_1)); + dispatch_command(std::move(cmd_2)); + dispatch_command(std::move(cmd_3)); } cluster::topic_updates_dispatcher dispatcher; @@ -165,3 +164,190 @@ FIXTURE_TEST( current_cluster_capacity(allocator.local().state().allocation_nodes()), max_cluster_capacity() - (1 * 3 + 12 * 3 + 8 * 1)); } + +FIXTURE_TEST( + allocator_partition_counts, topic_table_updates_dispatcher_fixture) { + const auto& allocation_nodes = allocator.local().state().allocation_nodes(); + + auto check_allocated_counts = [&](std::vector expected) { + std::vector counts; + for (const auto& [id, node] : allocation_nodes) { + BOOST_REQUIRE(id() == counts.size() + 1); // 1-based node ids + counts.push_back(node->allocated_partitions()); + } + logger.debug("allocated counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); + }; + + auto check_final_counts = [&](std::vector expected) { + std::vector counts; + for (const auto& [id, node] : allocation_nodes) { + BOOST_REQUIRE(id() == counts.size() + 1); // 1-based node ids + counts.push_back(node->final_partitions()); + } + logger.debug("final counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); + }; + + auto create_topic_cmd = make_create_topic_cmd("test_tp_1", 4, 3); + logger.info("create topic {}", create_topic_cmd.key); + dispatch_command(create_topic_cmd); + + // create a node to move replicas to + allocator.local().register_node( + create_allocation_node(model::node_id(4), 4)); + + check_allocated_counts({4, 4, 4, 0}); + check_final_counts({4, 4, 4, 0}); + + // get data needed to move a partition + auto get_partition = [&](size_t id) { + model::ntp ntp{ + create_topic_cmd.key.ns, + create_topic_cmd.key.tp, + model::partition_id{id}}; + auto assignment_it = std::next( + create_topic_cmd.value.assignments.begin(), id); + BOOST_REQUIRE(assignment_it->id() == id); + + auto old_replicas = assignment_it->replicas; + + auto new_replicas = old_replicas; + auto it = std::find_if( + new_replicas.begin(), new_replicas.end(), [](const auto& bs) { + return bs.node_id() == 1; + }); + BOOST_REQUIRE(it != new_replicas.end()); + it->node_id = model::node_id{4}; + + return std::tuple{ + ntp, + old_replicas, + new_replicas, + }; + }; + + // move + finish + { + auto [ntp, old_replicas, new_replicas] = get_partition(0); + + logger.info("move ntp {}", ntp); + dispatch_command( + cluster::move_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({4, 4, 4, 1}); + check_final_counts({3, 4, 4, 1}); + + logger.info("finish move"); + dispatch_command( + cluster::finish_moving_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({3, 4, 4, 1}); + check_final_counts({3, 4, 4, 1}); + } + + // move + cancel + force_cancel + finish + { + auto [ntp, old_replicas, new_replicas] = get_partition(1); + + logger.info("move ntp {}", ntp); + dispatch_command( + cluster::move_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({2, 4, 4, 2}); + + logger.info("cancel move"); + dispatch_command(cluster::cancel_moving_partition_replicas_cmd{ + ntp, + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{false}}}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({3, 4, 4, 1}); + + logger.info("force-cancel move"); + dispatch_command(cluster::cancel_moving_partition_replicas_cmd{ + ntp, + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{true}}}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({3, 4, 4, 1}); + + logger.info("finish move"); + dispatch_command( + cluster::finish_moving_partition_replicas_cmd{ntp, old_replicas}); + check_allocated_counts({3, 4, 4, 1}); + check_final_counts({3, 4, 4, 1}); + } + + // move + cancel + revert_cancel + { + auto [ntp, old_replicas, new_replicas] = get_partition(2); + + logger.info("move ntp {}", ntp); + dispatch_command( + cluster::move_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({2, 4, 4, 2}); + + logger.info("cancel move"); + dispatch_command(cluster::cancel_moving_partition_replicas_cmd{ + ntp, + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{false}}}); + check_allocated_counts({3, 4, 4, 2}); + check_final_counts({3, 4, 4, 1}); + + logger.info("revert_cancel move"); + dispatch_command(cluster::revert_cancel_partition_move_cmd( + int8_t{0}, + cluster::revert_cancel_partition_move_cmd_data{.ntp = ntp})); + check_allocated_counts({2, 4, 4, 2}); + check_final_counts({2, 4, 4, 2}); + } + + // force_move + { + auto [ntp, old_replicas, new_replicas] = get_partition(3); + + // for new_replicas choose a proper subset of old replicas, as required + // by force_partition_reconfiguration. + auto repl_it = std::find_if( + old_replicas.begin(), old_replicas.end(), [](const auto& bs) { + return bs.node_id() == 1; + }); + BOOST_REQUIRE(repl_it != old_replicas.end()); + new_replicas = std::vector({*repl_it}); + + logger.info( + "force_partition_reconfiguration ntp {} to {}", ntp, new_replicas); + dispatch_command(cluster::force_partition_reconfiguration_cmd{ + ntp, + cluster::force_partition_reconfiguration_cmd_data(new_replicas)}); + check_allocated_counts({2, 4, 4, 2}); + check_final_counts({2, 3, 3, 2}); + + logger.info("finish move"); + dispatch_command( + cluster::finish_moving_partition_replicas_cmd{ntp, new_replicas}); + check_allocated_counts({2, 3, 3, 2}); + check_final_counts({2, 3, 3, 2}); + } + + // move topic + topic delete + { + // move everything back + logger.info("move topic"); + std::vector cmd_data; + for (const auto& p_as : create_topic_cmd.value.assignments) { + cmd_data.emplace_back(p_as.id, p_as.replicas); + } + dispatch_command( + cluster::move_topic_replicas_cmd(create_topic_cmd.key, cmd_data)); + check_allocated_counts({4, 4, 4, 2}); + check_final_counts({4, 4, 4, 0}); + + logger.info("delete topic"); + dispatch_command(cluster::delete_topic_cmd( + create_topic_cmd.key, create_topic_cmd.key)); + check_allocated_counts({0, 0, 0, 0}); + check_final_counts({0, 0, 0, 0}); + } +} From c70ab93350e2a661c8fe67ebdd006daaab454dd0 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 1 Jun 2023 20:16:11 +0300 Subject: [PATCH 3/5] c/partition_allocator: add final counts assertions to tests --- .../tests/partition_allocator_tests.cc | 64 ++++++++++++++----- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 69162bf719c5..6d35d88c5301 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -23,6 +23,8 @@ #include #include +ss::logger logger{"allocator_test"}; + static void validate_replica_set_diversity( const std::vector& replicas) { if (replicas.size() > 1) { @@ -568,16 +570,32 @@ FIXTURE_TEST(even_distribution_pri_allocation, partition_allocator_fixture) { } } -void check_partition_counts( +void check_allocated_counts( + const cluster::partition_allocator& allocator, + const std::vector& expected, + cluster::partition_allocation_domain domain + = cluster::partition_allocation_domains::common) { + std::vector counts; + for (const auto& [id, node] : allocator.state().allocation_nodes()) { + BOOST_REQUIRE(id() == counts.size()); + counts.push_back(node->domain_allocated_partitions(domain)); + } + logger.debug("allocated counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); +}; + +void check_final_counts( const cluster::partition_allocator& allocator, const std::vector& expected, cluster::partition_allocation_domain domain = cluster::partition_allocation_domains::common) { - for (const auto& node : allocator.state().allocation_nodes()) { - model::node_id node_id = node.first; - auto count = node.second->domain_allocated_partitions(domain); - BOOST_REQUIRE_EQUAL(count(), expected.at(node_id())); + std::vector counts; + for (const auto& [id, node] : allocator.state().allocation_nodes()) { + BOOST_REQUIRE(id() == counts.size()); + counts.push_back(node->domain_final_partitions(domain)); } + logger.debug("final counts: {}, expected: {}", counts, expected); + BOOST_CHECK_EQUAL(counts, expected); }; FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { @@ -594,7 +612,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { // add another node to move replicas to and from register_node(3, 1); - check_partition_counts(allocator, {1, 1, 1, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0}); + check_final_counts(allocator, {1, 1, 1, 0}); { cluster::allocated_partition reallocated @@ -612,7 +631,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{3}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); // there is no replica on node 0 any more auto moved2 = allocator.reallocate_replica( @@ -629,7 +649,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( moved3.error(), cluster::errc::no_eligible_allocation_nodes); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); // replicas can move to the same place auto moved4 = allocator.reallocate_replica( @@ -639,7 +660,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{3}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); auto moved5 = allocator.reallocate_replica( reallocated, model::node_id{2}, cluster::allocation_constraints{}); @@ -648,7 +670,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{2}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); std::vector new_replicas(reallocated.replicas()); cluster::allocation_constraints not_on_new_nodes; @@ -663,7 +686,8 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{0}); - check_partition_counts(allocator, {1, 1, 1, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0}); + check_final_counts(allocator, {1, 1, 1, 0}); // do another move so that we have something to revert auto moved7 = allocator.reallocate_replica( @@ -674,10 +698,12 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL( reallocated.replicas().at(2).node_id, model::node_id{3}); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {1, 0, 1, 1}); } - check_partition_counts(allocator, {1, 1, 1, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0}); + check_final_counts(allocator, {1, 1, 1, 0}); } FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { @@ -695,7 +721,8 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { register_node(3, 1); register_node(4, 1); - check_partition_counts(allocator, {1, 1, 1, 0, 0}); + check_allocated_counts(allocator, {1, 1, 1, 0, 0}); + check_final_counts(allocator, {1, 1, 1, 0, 0}); cluster::allocation_constraints not_on_old_nodes; not_on_old_nodes.add(cluster::distinct_from(original_assignment.replicas)); @@ -717,7 +744,8 @@ FIXTURE_TEST(reallocate_partition_with_move, partition_allocator_fixture) { BOOST_REQUIRE_EQUAL(replicas_set.size(), 4); BOOST_REQUIRE(!replicas_set.contains(model::node_id{0})); - check_partition_counts(allocator, {1, 1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1, 1}); } { @@ -784,7 +812,8 @@ FIXTURE_TEST( replica2shard.emplace(bs.node_id, bs.shard); } - check_partition_counts(allocator, {2, 2, 2, 1}); + check_allocated_counts(allocator, {2, 2, 2, 1}); + check_final_counts(allocator, {2, 2, 2, 1}); cluster::allocated_partition reallocated = allocator.make_allocated_partition( @@ -800,7 +829,8 @@ FIXTURE_TEST( // more attractive. But replicas on nodes 0, 1, and 2 should still end up on // shard 2 partition_1.reset(); - check_partition_counts(allocator, {1, 1, 1, 1}); + check_allocated_counts(allocator, {1, 1, 1, 1}); + check_final_counts(allocator, {0, 1, 1, 1}); // Reallocate replica on node 1 to itself. moved = allocator.reallocate_replica( From d7a98e381c20442e93021a9205a1bf3494bf330c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 1 Jun 2023 21:10:10 +0300 Subject: [PATCH 4/5] c/scheduling/constraints: use final counts in the counts soft constraint Choose a node for the replica based on final partition counts, not on currently allocated partitions (this should give better results for counts-based rebalancing). Additionally, rename these constraints to better reflect what they are doing. --- src/v/cluster/scheduling/constraints.cc | 16 +++++++++++----- src/v/cluster/scheduling/constraints.h | 14 +++++++------- src/v/cluster/scheduling/partition_allocator.cc | 4 ++-- src/v/cluster/tests/partition_allocator_tests.cc | 7 ++++++- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/v/cluster/scheduling/constraints.cc b/src/v/cluster/scheduling/constraints.cc index aa32b34482d8..f69930d9b023 100644 --- a/src/v/cluster/scheduling/constraints.cc +++ b/src/v/cluster/scheduling/constraints.cc @@ -219,7 +219,7 @@ hard_constraint disk_not_overflowed_by_partition( max_disk_usage_ratio, partition_size, node_disk_reports)); } -soft_constraint least_allocated() { +soft_constraint max_final_capacity() { class impl : public soft_constraint::impl { public: soft_constraint_evaluator @@ -227,7 +227,10 @@ soft_constraint least_allocated() { return [](const allocation_node& node) { // we return 0 for fully allocated node and 10'000'000 for nodes // with maximum capacity available - return (soft_constraint::max_score * node.partition_capacity()) + auto final_capacity + = node.max_capacity() + - std::min(node.max_capacity(), node.final_partitions()); + return (soft_constraint::max_score * final_capacity) / node.max_capacity(); }; } @@ -239,7 +242,7 @@ soft_constraint least_allocated() { } soft_constraint -least_allocated_in_domain(const partition_allocation_domain domain) { +max_final_capacity_in_domain(const partition_allocation_domain domain) { struct impl : soft_constraint::impl { explicit impl(partition_allocation_domain domain_) : domain(domain_) {} @@ -247,8 +250,11 @@ least_allocated_in_domain(const partition_allocation_domain domain) { soft_constraint_evaluator make_evaluator(const replicas_t&) const final { return [this](const allocation_node& node) { - return (soft_constraint::max_score - * node.domain_partition_capacity(domain)) + auto final_capacity = node.max_capacity() + - std::min( + node.max_capacity(), + node.domain_final_partitions(domain)); + return (soft_constraint::max_score * final_capacity) / node.max_capacity(); }; } diff --git a/src/v/cluster/scheduling/constraints.h b/src/v/cluster/scheduling/constraints.h index 0fa73c12d0f1..ca89b8b0ed28 100644 --- a/src/v/cluster/scheduling/constraints.h +++ b/src/v/cluster/scheduling/constraints.h @@ -54,18 +54,18 @@ hard_constraint disk_not_overflowed_by_partition( node_disk_reports); /* - * scores nodes based on free overall allocation capacity left + * scores nodes based on partition count after all moves have been finished * returning `0` for fully allocated nodes and `max_capacity` for empty nodes */ -soft_constraint least_allocated(); +soft_constraint max_final_capacity(); /* - * scores nodes based on allocation capacity used by priority partitions - * returning `0` for nodes fully allocated for priority partitions - * and `max_capacity` for nodes without any priority partitions - * non-priority partition allocations are ignored + * scores nodes based on partition counts of priority partitions after all moves + * have been finished, returning `0` for nodes fully allocated for priority + * partitions and `max_capacity` for nodes without any priority partitions. + * non-priority partition allocations are ignored. */ -soft_constraint least_allocated_in_domain(partition_allocation_domain); +soft_constraint max_final_capacity_in_domain(partition_allocation_domain); /* * constraint scores nodes on free disk space diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index f431713904ff..cc2460a13d16 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -66,9 +66,9 @@ allocation_constraints partition_allocator::default_constraints( req.add(is_active()); if (domain == partition_allocation_domains::common) { - req.add(least_allocated()); + req.add(max_final_capacity()); } else { - req.add(least_allocated_in_domain(domain)); + req.add(max_final_capacity_in_domain(domain)); } if (_enable_rack_awareness()) { req.add(distinct_rack_preferred(_members.local())); diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index 6d35d88c5301..b1f99de3481d 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -663,8 +663,13 @@ FIXTURE_TEST(incrementally_reallocate_replicas, partition_allocator_fixture) { check_allocated_counts(allocator, {1, 1, 1, 1}); check_final_counts(allocator, {0, 1, 1, 1}); + std::vector node_0( + {model::broker_shard{.node_id = model::node_id{0}, .shard = 0}}); + cluster::allocation_constraints not_on_node0; + not_on_node0.add(cluster::distinct_from(node_0)); + auto moved5 = allocator.reallocate_replica( - reallocated, model::node_id{2}, cluster::allocation_constraints{}); + reallocated, model::node_id{2}, not_on_node0); BOOST_REQUIRE(moved5.has_value()); BOOST_REQUIRE_EQUAL(moved5.value().node_id, model::node_id{2}); BOOST_REQUIRE_EQUAL( From 6ee77dac7cbac95e4a67cb832adef0a467f8cd98 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 1 Jun 2023 21:41:39 +0300 Subject: [PATCH 5/5] c/members_backend: use final counts in partition count rebalancing --- src/v/cluster/members_backend.cc | 78 ++++++-------------------------- 1 file changed, 13 insertions(+), 65 deletions(-) diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index 60d73a08e4d6..6419af8f7a80 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -25,7 +25,7 @@ namespace cluster { namespace { struct node_replicas { - size_t allocated_replicas; + size_t final_count; size_t max_capacity; }; @@ -40,15 +40,13 @@ calculate_replicas_per_node( if (!n->is_active()) { continue; } - auto [it, _] = ret.try_emplace( + auto [it, inserted] = ret.try_emplace( id, node_replicas{ - .allocated_replicas = 0, - .max_capacity = n->domain_partition_capacity(domain), + .final_count = n->domain_final_partitions(domain), + .max_capacity = n->max_capacity(), }); - - const auto domain_allocated = n->domain_allocated_partitions(domain); - it->second.allocated_replicas += domain_allocated; + vassert(inserted, "node {} inserted twice", id); } return ret; } @@ -57,7 +55,7 @@ static size_t calculate_total_replicas(const node_replicas_map_t& node_replicas) { size_t total_replicas = 0; for (auto& [_, replicas] : node_replicas) { - total_replicas += replicas.allocated_replicas; + total_replicas += replicas.final_count; } return total_replicas; } @@ -137,10 +135,7 @@ void reassign_replicas( * **/ double calculate_unevenness_error( - const partition_allocator& allocator, - const members_backend::update_meta& update, - const topic_table& topics, - partition_allocation_domain domain) { + const partition_allocator& allocator, partition_allocation_domain domain) { static const std::vector domains{ partition_allocation_domains::consumer_offsets, partition_allocation_domains::common}; @@ -148,53 +143,8 @@ double calculate_unevenness_error( const auto node_cnt = allocator.state().available_nodes(); auto node_replicas = calculate_replicas_per_node(allocator, domain); - /** - * adjust per node replicas with the replicas that are going to be removed - * from the node after successful reallocation - * based on the state of reallocation the following adjustments are made: - * - * reallocation_state::initial - no adjustment required - * reallocation_state::reassigned - allocator already updated, adjusting - * reallocation_state::requested - allocator already updated, adjusting - * reallocation_state::finished - no adjustment required - * - * Do not need to care about the cancel related state here as no - * cancellations are requested when node is added to the cluster. - */ - for (const auto& [ntp, r] : update.partition_reallocations) { - using state = members_backend::reallocation_state; - /** - * In the initial or finished state the adjustment doesn't have - * to be taken into account as partition balancer is already updated. - */ - if ( - r.state == state::initial || r.state == state::finished - || r.state == state::cancelled || r.state == state::request_cancel) { - continue; - } - /** - * if a partition move was already requested it might have already been - * finished, consult topic table to check if the update is still in - * progress. If no move is in progress the adjustment must be skipped as - * allocator state is already up to date. Reallocation will be marked as - * finished in reconciliation loop pass. - */ - if (r.state == state::requested && !topics.is_update_in_progress(ntp)) { - continue; - } - - if (get_allocation_domain(ntp) == domain) { - for (const auto& to_remove : r.replicas_to_remove) { - auto it = node_replicas.find(to_remove); - if (it != node_replicas.end()) { - it->second.allocated_replicas--; - } - } - } - } const auto total_replicas = calculate_total_replicas(node_replicas); - if (total_replicas == 0) { return 0.0; } @@ -213,14 +163,14 @@ double calculate_unevenness_error( double err = 0; for (auto& [id, allocation_info] : node_replicas) { double diff = static_cast(target_replicas_per_node) - - static_cast(allocation_info.allocated_replicas); + - static_cast(allocation_info.final_count); vlog( clusterlog.trace, "node {} has {} replicas allocated in domain {}, requested replicas " "per node {}, difference: {}", id, - allocation_info.allocated_replicas, + allocation_info.final_count, domain, target_replicas_per_node, diff); @@ -485,8 +435,7 @@ void members_backend::default_reallocation_strategy:: } calculate_reallocations_batch( max_batch_size, allocator, topics, meta, domain); - auto current_error = calculate_unevenness_error( - allocator, meta, topics, domain); + auto current_error = calculate_unevenness_error(allocator, domain); auto [it, _] = meta.last_unevenness_error.try_emplace(domain, 1.0); auto improvement = it->second - current_error; @@ -564,9 +513,8 @@ void members_backend::default_reallocation_strategy:: absl::flat_hash_map to_move_from_node; for (auto& [id, info] : node_replicas) { - auto to_move = info.allocated_replicas - - std::min( - target_replicas_per_node, info.allocated_replicas); + auto to_move = info.final_count + - std::min(target_replicas_per_node, info.final_count); if (to_move > 0) { to_move_from_node.emplace(id, to_move); } @@ -582,7 +530,7 @@ void members_backend::default_reallocation_strategy:: cnt, id, domain, - node_replicas[id].allocated_replicas); + node_replicas[id].final_count); } }