From 06db9209a4c242238a3621d1752b0af6b0cc9120 Mon Sep 17 00:00:00 2001 From: Ioannis Kavvadias Date: Mon, 18 Nov 2024 12:43:29 +0000 Subject: [PATCH 1/3] cluster: refactor topics_frontend move static make_allocation_request helper functions into an anonymous namespace (cherry picked from commit ef6a6ac0ee0d28ac1a8282f83873b2fc04d10c4e) --- src/v/cluster/topics_frontend.cc | 119 ++++++++++++++++--------------- 1 file changed, 62 insertions(+), 57 deletions(-) diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index e62f457521949..ce373a88814fd 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -62,6 +62,68 @@ #include #include +namespace { + +cluster::allocation_request make_allocation_request( + const cluster::custom_assignable_topic_configuration& ca_cfg, + bool topic_aware) { + // no custom assignments, lets allocator decide based on partition count + const auto& tp_ns = ca_cfg.cfg.tp_ns; + cluster::allocation_request req(tp_ns, cluster::get_allocation_domain(tp_ns)); + if (!ca_cfg.has_custom_assignment()) { + req.partitions.reserve(ca_cfg.cfg.partition_count); + for (auto p = 0; p < ca_cfg.cfg.partition_count; ++p) { + req.partitions.emplace_back( + model::partition_id(p), ca_cfg.cfg.replication_factor); + } + } else { + req.partitions.reserve(ca_cfg.custom_assignments.size()); + for (auto& cas : ca_cfg.custom_assignments) { + cluster::allocation_constraints constraints; + constraints.add(cluster::on_nodes(cas.replicas)); + + req.partitions.emplace_back( + cas.id, cas.replicas.size(), std::move(constraints)); + } + } + if (topic_aware) { + req.existing_replica_counts = cluster::node2count_t{}; + } + return req; +} + +cluster::allocation_request make_allocation_request( + int16_t replication_factor, + const int32_t current_partitions_count, + std::optional existing_replica_counts, + const cluster::create_partitions_configuration& cfg) { + const auto new_partitions_cnt = cfg.new_total_partition_count + - current_partitions_count; + cluster::allocation_request req(cfg.tp_ns, cluster::get_allocation_domain(cfg.tp_ns)); + req.existing_replica_counts = std::move(existing_replica_counts); + req.partitions.reserve(new_partitions_cnt); + for (auto p = 0; p < new_partitions_cnt; ++p) { + req.partitions.emplace_back(model::partition_id(p), replication_factor); + } + return req; +} + +cluster::allocation_request make_allocation_request( + model::ntp ntp, + cluster::replication_factor tp_replication_factor, + const std::vector& new_replicas) { + auto nt = model::topic_namespace(ntp.ns, ntp.tp.topic); + cluster::allocation_request req(nt, cluster::get_allocation_domain(nt)); + req.partitions.reserve(1); + cluster::allocation_constraints constraints; + constraints.add(cluster::on_nodes(new_replicas)); + req.partitions.emplace_back( + ntp.tp.partition, tp_replication_factor, std::move(constraints)); + return req; +} + +} // namespace + namespace cluster { topics_frontend::topics_frontend( @@ -365,33 +427,6 @@ make_error_result(const model::topic_namespace& tp_ns, std::error_code ec) { return topic_result(tp_ns, errc::topic_operation_error); } -static allocation_request make_allocation_request( - const custom_assignable_topic_configuration& ca_cfg, bool topic_aware) { - // no custom assignments, lets allocator decide based on partition count - const auto& tp_ns = ca_cfg.cfg.tp_ns; - allocation_request req(tp_ns, get_allocation_domain(tp_ns)); - if (!ca_cfg.has_custom_assignment()) { - req.partitions.reserve(ca_cfg.cfg.partition_count); - for (auto p = 0; p < ca_cfg.cfg.partition_count; ++p) { - req.partitions.emplace_back( - model::partition_id(p), ca_cfg.cfg.replication_factor); - } - } else { - req.partitions.reserve(ca_cfg.custom_assignments.size()); - for (auto& cas : ca_cfg.custom_assignments) { - allocation_constraints constraints; - constraints.add(on_nodes(cas.replicas)); - - req.partitions.emplace_back( - cas.id, cas.replicas.size(), std::move(constraints)); - } - } - if (topic_aware) { - req.existing_replica_counts = node2count_t{}; - } - return req; -} - errc topics_frontend::validate_topic_configuration( const custom_assignable_topic_configuration& assignable_config) { if (!validate_topic_name(assignable_config.cfg.tp_ns)) { @@ -1400,22 +1435,6 @@ topics_frontend::validate_shard(model::node_id node, uint32_t shard) const { }); } -static allocation_request make_allocation_request( - int16_t replication_factor, - const int32_t current_partitions_count, - std::optional existing_replica_counts, - const create_partitions_configuration& cfg) { - const auto new_partitions_cnt = cfg.new_total_partition_count - - current_partitions_count; - allocation_request req(cfg.tp_ns, get_allocation_domain(cfg.tp_ns)); - req.existing_replica_counts = std::move(existing_replica_counts); - req.partitions.reserve(new_partitions_cnt); - for (auto p = 0; p < new_partitions_cnt; ++p) { - req.partitions.emplace_back(model::partition_id(p), replication_factor); - } - return req; -} - ss::future topics_frontend::do_create_partition( create_partitions_configuration p_cfg, model::timeout_clock::time_point timeout) { @@ -1859,20 +1878,6 @@ ss::future topics_frontend::decrease_replication_factor( co_return co_await replicate_and_wait(_stm, _as, std::move(cmd), timeout); } -allocation_request make_allocation_request( - model::ntp ntp, - replication_factor tp_replication_factor, - const std::vector& new_replicas) { - auto nt = model::topic_namespace(ntp.ns, ntp.tp.topic); - allocation_request req(nt, get_allocation_domain(nt)); - req.partitions.reserve(1); - allocation_constraints constraints; - constraints.add(on_nodes(new_replicas)); - req.partitions.emplace_back( - ntp.tp.partition, tp_replication_factor, std::move(constraints)); - return req; -} - ss::future>> topics_frontend::generate_reassignments( model::ntp ntp, std::vector new_replicas) { From c66456455cd4ef1c88df6de1c0bfef6dc461b87f Mon Sep 17 00:00:00 2001 From: Ioannis Kavvadias Date: Fri, 15 Nov 2024 12:04:55 +0000 Subject: [PATCH 2/3] cluster: introduce simple_allocation_request code path Create a new partition allocation path. The new path doesn't explicitly instantiate a full allocation_request upfront, as the size of this request scales with the number of requested partitions. This allows to check for system limitations and reject a request earlier, without the need to create a potentially resource-intense allocation_request. (cherry picked from commit fc1047866ec6766fe8c23b59d95d8d3c52c95f48) --- .../cluster/scheduling/partition_allocator.cc | 63 +++++++++--- .../cluster/scheduling/partition_allocator.h | 21 +++- src/v/cluster/scheduling/types.cc | 10 ++ src/v/cluster/scheduling/types.h | 39 ++++++++ .../tests/partition_allocator_fixture.h | 5 + .../tests/partition_allocator_tests.cc | 99 +++++++++++++++---- 6 files changed, 201 insertions(+), 36 deletions(-) diff --git a/src/v/cluster/scheduling/partition_allocator.cc b/src/v/cluster/scheduling/partition_allocator.cc index 2c0bf9526467f..3d4945d05507a 100644 --- a/src/v/cluster/scheduling/partition_allocator.cc +++ b/src/v/cluster/scheduling/partition_allocator.cc @@ -109,7 +109,7 @@ allocation_constraints partition_allocator::default_constraints() { * with partitions that cannot be re-accommodated on smaller peers). */ std::error_code partition_allocator::check_cluster_limits( - allocation_request const& request) const { + const uint64_t new_partitions_replicas_requested) const { if (_members.local().nodes().empty()) { // Empty members table, we're probably running in a unit test return errc::success; @@ -121,16 +121,8 @@ std::error_code partition_allocator::check_cluster_limits( existent_partitions += uint64_t(i.second->allocated_partitions()); } - // Partition-replicas requested - uint64_t create_count{0}; - for (const auto& i : request.partitions) { - if (i.replication_factor > i.existing_replicas.size()) { - create_count += uint64_t( - i.replication_factor - i.existing_replicas.size()); - } - } - - uint64_t proposed_total_partitions = existent_partitions + create_count; + uint64_t proposed_total_partitions = existent_partitions + + new_partitions_replicas_requested; // Gather information about system-wide resource sizes uint32_t min_core_count = 0; @@ -183,7 +175,7 @@ std::error_code partition_allocator::check_cluster_limits( clusterlog.warn, "Refusing to create {} partitions as total partition count {} would " "exceed core limit {}", - create_count, + new_partitions_replicas_requested, proposed_total_partitions, effective_cpu_count * _partitions_per_shard()); return errc::topic_invalid_partitions_core_limit; @@ -204,7 +196,7 @@ std::error_code partition_allocator::check_cluster_limits( "Refusing to create {} new partitions as total partition count " "{} " "would exceed memory limit {}", - create_count, + new_partitions_replicas_requested, proposed_total_partitions, memory_limit); return errc::topic_invalid_partitions_memory_limit; @@ -226,7 +218,7 @@ std::error_code partition_allocator::check_cluster_limits( clusterlog.warn, "Refusing to create {} partitions as total partition " "count {} would exceed FD limit {}", - create_count, + new_partitions_replicas_requested, proposed_total_partitions, fds_limit); return errc::topic_invalid_partitions_fd_limit; @@ -241,6 +233,33 @@ std::error_code partition_allocator::check_cluster_limits( return errc::success; } +ss::future> +partition_allocator::allocate(simple_allocation_request simple_req) { + vlog( + clusterlog.trace, + "allocation request for {} partitions", + simple_req.additional_partitions); + + const uint64_t create_count + = static_cast(simple_req.additional_partitions) + * static_cast(simple_req.replication_factor); + auto cluster_errc = check_cluster_limits(create_count); + if (cluster_errc) { + co_return cluster_errc; + } + + allocation_request req( + simple_req.tp_ns, get_allocation_domain(simple_req.tp_ns)); + req.partitions.reserve(simple_req.additional_partitions); + for (auto p = 0; p < simple_req.additional_partitions; ++p) { + req.partitions.emplace_back( + model::partition_id(p), simple_req.replication_factor); + } + req.existing_replica_counts = std::move(simple_req.existing_replica_counts); + + co_return co_await do_allocate(std::move(req)); +} + ss::future> partition_allocator::allocate(allocation_request request) { vlog( @@ -248,11 +267,25 @@ partition_allocator::allocate(allocation_request request) { "allocation request for {} partitions", request.partitions.size()); - auto cluster_errc = check_cluster_limits(request); + // Partition-replicas requested + uint64_t create_count{0}; + for (const auto& i : request.partitions) { + if (i.replication_factor > i.existing_replicas.size()) { + create_count += uint64_t( + i.replication_factor - i.existing_replicas.size()); + } + } + + auto cluster_errc = check_cluster_limits(create_count); if (cluster_errc) { co_return cluster_errc; } + co_return co_await do_allocate(std::move(request)); +} + +ss::future> +partition_allocator::do_allocate(allocation_request request) { std::optional node2count; if (request.existing_replica_counts) { node2count = std::move(*request.existing_replica_counts); diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index 5f6a870d93d86..921b7e2a6b23b 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -43,6 +43,18 @@ class partition_allocator { // Replica placement APIs + /** + * Return an allocation_units object wrapping the result of the allocating + * the given allocation request, or an error if it was not possible. + * + * This overload of allocate will validate cluster limits upfront, before + * instantiating an allocation_request. This allows to validate very big + * allocation_requests, where the size of the request itself could be an + * issue in itself. E.g. a request for a billion partitions. + */ + ss::future> + allocate(simple_allocation_request); + /** * Return an allocation_units object wrapping the result of the allocating * the given allocation request, or an error if it was not possible. @@ -137,8 +149,13 @@ class partition_allocator { ss::future<> apply_snapshot(const controller_snapshot&); private: - std::error_code - check_cluster_limits(allocation_request const& request) const; + // new_partitions_replicas_requested represents the total number of + // partitions requested by a request. i.e. partitions * replicas requested. + std::error_code check_cluster_limits( + const uint64_t new_partitions_replicas_requested) const; + + ss::future> + do_allocate(allocation_request); result do_allocate_replica( allocated_partition&, diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 55a80dcf4215a..272329c89f6d6 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -249,4 +249,14 @@ std::ostream& operator<<(std::ostream& o, const allocation_request& req) { o, "{{partion_constraints: {}, domain: {}}}", req.partitions, req.domain); return o; } +std::ostream& +operator<<(std::ostream& o, const simple_allocation_request& req) { + fmt::print( + o, + "{{topic: {}, additional_partitions: {}, replication_factor: {}}}", + req.tp_ns, + req.additional_partitions, + req.replication_factor); + return o; +} } // namespace cluster diff --git a/src/v/cluster/scheduling/types.h b/src/v/cluster/scheduling/types.h index 96a831a8963b4..1d6cb5314a6a7 100644 --- a/src/v/cluster/scheduling/types.h +++ b/src/v/cluster/scheduling/types.h @@ -15,6 +15,7 @@ #include "base/vassert.h" #include "cluster/types.h" #include "model/fundamental.h" +#include "model/metadata.h" #include #include @@ -24,6 +25,8 @@ #include #include +#include + namespace cluster { class allocation_node; class allocation_state; @@ -368,4 +371,40 @@ struct allocation_request { friend std::ostream& operator<<(std::ostream&, const allocation_request&); }; +/** + * The simple_allocation_request represents a simplified allocation_request + * that doesn't allow for partition-specific requirements. The memory required + * for a simple_allocation_request objects does not scale with the number of + * requested partitions. + */ +struct simple_allocation_request { + simple_allocation_request() = delete; + simple_allocation_request(const simple_allocation_request&) = delete; + simple_allocation_request(simple_allocation_request&&) = default; + simple_allocation_request& operator=(const simple_allocation_request&) + = delete; + simple_allocation_request& operator=(simple_allocation_request&&) = default; + ~simple_allocation_request() = default; + + simple_allocation_request( + model::topic_namespace tp_ns, + int32_t additional_partitions, + int16_t replication_factor, + std::optional existing_replac_counts = std::nullopt) + : tp_ns{std::move(tp_ns)} + , additional_partitions{additional_partitions} + , replication_factor{replication_factor} + , existing_replica_counts{std::move(existing_replac_counts)} {} + + model::topic_namespace tp_ns; + int32_t additional_partitions{0}; + int16_t replication_factor{0}; + // if present, new partitions will be allocated using topic-aware counts + // objective. + std::optional existing_replica_counts; + + friend std::ostream& + operator<<(std::ostream&, const simple_allocation_request&); +}; + } // namespace cluster diff --git a/src/v/cluster/tests/partition_allocator_fixture.h b/src/v/cluster/tests/partition_allocator_fixture.h index 1f3ff51408c45..e56d7807fefef 100644 --- a/src/v/cluster/tests/partition_allocator_fixture.h +++ b/src/v/cluster/tests/partition_allocator_fixture.h @@ -113,6 +113,11 @@ struct partition_allocator_fixture { }); } + cluster::simple_allocation_request make_simple_allocation_request( + int32_t partitions, int16_t replication_factor) { + return {tn, partitions, replication_factor}; + } + cluster::allocation_request make_allocation_request(int partitions, uint16_t replication_factor) { return make_allocation_request(tn, partitions, replication_factor); diff --git a/src/v/cluster/tests/partition_allocator_tests.cc b/src/v/cluster/tests/partition_allocator_tests.cc index d68ed2c5a6791..13992e981a04e 100644 --- a/src/v/cluster/tests/partition_allocator_tests.cc +++ b/src/v/cluster/tests/partition_allocator_tests.cc @@ -90,35 +90,75 @@ FIXTURE_TEST(allocation_over_core_capacity, partition_allocator_fixture) { const auto partition_count = partition_allocator_fixture::partitions_per_shard + 1; register_node(0, 1); - auto result - = allocator().allocate(make_allocation_request(partition_count, 1)).get(); - BOOST_REQUIRE(result.has_error()); - BOOST_REQUIRE_EQUAL( - result.assume_error(), - cluster::make_error_code( - cluster::errc::topic_invalid_partitions_core_limit)); + + { + auto result = allocator() + .allocate(make_allocation_request(partition_count, 1)) + .get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_core_limit)); + } + + { + auto result = allocator() + .allocate( + make_simple_allocation_request(partition_count, 1)) + .get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_core_limit)); + } } FIXTURE_TEST( allocation_over_memory_capacity, partition_allocator_memory_limited_fixture) { register_node(0, 1); - auto result = allocator().allocate(make_allocation_request(1, 1)).get(); - BOOST_REQUIRE(result.has_error()); - BOOST_REQUIRE_EQUAL( - result.assume_error(), - cluster::make_error_code( - cluster::errc::topic_invalid_partitions_memory_limit)); + + { + auto result = allocator().allocate(make_allocation_request(1, 1)).get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_memory_limit)); + } + { + auto result + = allocator().allocate(make_simple_allocation_request(1, 1)).get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_memory_limit)); + } } FIXTURE_TEST( allocation_over_fds_capacity, partition_allocator_fd_limited_fixture) { register_node(0, 1); - auto result = allocator().allocate(make_allocation_request(1, 1)).get(); - BOOST_REQUIRE(result.has_error()); - BOOST_REQUIRE_EQUAL( - result.assume_error(), - cluster::make_error_code( - cluster::errc::topic_invalid_partitions_fd_limit)); + + { + auto result = allocator().allocate(make_allocation_request(1, 1)).get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_fd_limit)); + } + { + auto result + = allocator().allocate(make_simple_allocation_request(1, 1)).get(); + BOOST_REQUIRE(result.has_error()); + BOOST_REQUIRE_EQUAL( + result.assume_error(), + cluster::make_error_code( + cluster::errc::topic_invalid_partitions_fd_limit)); + } } FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) { @@ -342,6 +382,27 @@ FIXTURE_TEST(allocation_units_test, partition_allocator_fixture) { // we do not decrement the highest raft group BOOST_REQUIRE_EQUAL(allocator().state().last_group_id()(), 10); } +FIXTURE_TEST(allocation_units_test_raw_req, partition_allocator_fixture) { + register_node(1, 10); + register_node(2, 11); + register_node(3, 12); + // just fill up the cluster partially + + { + auto allocs = allocator() + .allocate(make_simple_allocation_request(10, 3)) + .get() + .value(); + BOOST_REQUIRE_EQUAL(allocs->get_assignments().size(), 10); + BOOST_REQUIRE_EQUAL( + allocated_nodes_count(allocs->get_assignments()), 3 * 10); + } + + BOOST_REQUIRE(all_nodes_empty()); + + // we do not decrement the highest raft group + BOOST_REQUIRE_EQUAL(allocator().state().last_group_id()(), 10); +} FIXTURE_TEST(decommission_node, partition_allocator_fixture) { register_node(0, 32); register_node(1, 64); From 7ed1d4a4fb92c88ceb5636281258b2bab3af878d Mon Sep 17 00:00:00 2001 From: Ioannis Kavvadias Date: Thu, 21 Nov 2024 13:16:38 +0000 Subject: [PATCH 3/3] cluster: split make_allocation_request code paths Where applicable, use the simple_allocation_request interface. (cherry picked from commit 16a95fb467c765f2a08f8434483f15cddce4a9ce) --- src/v/cluster/topics_frontend.cc | 77 +++++++++++++++++--------------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index ce373a88814fd..6b283a9567aa4 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -45,6 +45,7 @@ #include "random/generators.h" #include "rpc/errc.h" #include "rpc/types.h" +#include "scheduling/types.h" #include "ssx/future-util.h" #include "topic_configuration.h" @@ -64,46 +65,42 @@ namespace { -cluster::allocation_request make_allocation_request( +cluster::simple_allocation_request make_simple_allocation_request( const cluster::custom_assignable_topic_configuration& ca_cfg, bool topic_aware) { - // no custom assignments, lets allocator decide based on partition count - const auto& tp_ns = ca_cfg.cfg.tp_ns; - cluster::allocation_request req(tp_ns, cluster::get_allocation_domain(tp_ns)); - if (!ca_cfg.has_custom_assignment()) { - req.partitions.reserve(ca_cfg.cfg.partition_count); - for (auto p = 0; p < ca_cfg.cfg.partition_count; ++p) { - req.partitions.emplace_back( - model::partition_id(p), ca_cfg.cfg.replication_factor); - } - } else { - req.partitions.reserve(ca_cfg.custom_assignments.size()); - for (auto& cas : ca_cfg.custom_assignments) { - cluster::allocation_constraints constraints; - constraints.add(cluster::on_nodes(cas.replicas)); - - req.partitions.emplace_back( - cas.id, cas.replicas.size(), std::move(constraints)); - } - } + vassert( + !ca_cfg.has_custom_assignment(), + "make_custom_allocation_request should have been called, instead"); + cluster::simple_allocation_request req{ + ca_cfg.cfg.tp_ns, + ca_cfg.cfg.partition_count, + ca_cfg.cfg.replication_factor}; if (topic_aware) { req.existing_replica_counts = cluster::node2count_t{}; } return req; } -cluster::allocation_request make_allocation_request( - int16_t replication_factor, - const int32_t current_partitions_count, - std::optional existing_replica_counts, - const cluster::create_partitions_configuration& cfg) { - const auto new_partitions_cnt = cfg.new_total_partition_count - - current_partitions_count; - cluster::allocation_request req(cfg.tp_ns, cluster::get_allocation_domain(cfg.tp_ns)); - req.existing_replica_counts = std::move(existing_replica_counts); - req.partitions.reserve(new_partitions_cnt); - for (auto p = 0; p < new_partitions_cnt; ++p) { - req.partitions.emplace_back(model::partition_id(p), replication_factor); +cluster::allocation_request make_custom_allocation_request( + const cluster::custom_assignable_topic_configuration& ca_cfg, + bool topic_aware) { + vassert( + ca_cfg.has_custom_assignment(), + "make_simple_allocation_request should have been called, instead"); + // no custom assignments, lets allocator decide based on partition count + const auto& tp_ns = ca_cfg.cfg.tp_ns; + cluster::allocation_request req( + tp_ns, cluster::get_allocation_domain(tp_ns)); + req.partitions.reserve(ca_cfg.custom_assignments.size()); + for (auto& cas : ca_cfg.custom_assignments) { + cluster::allocation_constraints constraints; + constraints.add(cluster::on_nodes(cas.replicas)); + + req.partitions.emplace_back( + cas.id, cas.replicas.size(), std::move(constraints)); + } + if (topic_aware) { + req.existing_replica_counts = cluster::node2count_t{}; } return req; } @@ -628,8 +625,12 @@ ss::future topics_frontend::do_create_topic( partition_allocator::shard, [assignable_config, topic_aware = _partition_autobalancing_topic_aware()]( partition_allocator& al) { + if (assignable_config.has_custom_assignment()) { + return al.allocate( + make_custom_allocation_request(assignable_config, topic_aware)); + } return al.allocate( - make_allocation_request(assignable_config, topic_aware)); + make_simple_allocation_request(assignable_config, topic_aware)); }); if (!units) { @@ -1486,8 +1487,14 @@ ss::future topics_frontend::do_create_partition( current = tp_cfg->partition_count, existing_rc = std::move(existing_replica_counts), rf = replication_factor.value()](partition_allocator& al) mutable { - return al.allocate(make_allocation_request( - rf, current, std::move(existing_rc), p_cfg)); + const auto new_partitions_cnt = p_cfg.new_total_partition_count + - current; + const auto replication_factor = static_cast(rf); + return al.allocate(simple_allocation_request{ + p_cfg.tp_ns, + new_partitions_cnt, + replication_factor, + std::move(existing_rc)}); }); // no assignments, error