Skip to content

Commit

Permalink
Merge pull request #18581 from ztlpn/flex-assignment-local
Browse files Browse the repository at this point in the history
Node-local shard assignments for created/moved partitions
  • Loading branch information
ztlpn authored Jun 10, 2024
2 parents 1ae89de + b305bfe commit dcbba8c
Show file tree
Hide file tree
Showing 44 changed files with 1,533 additions and 444 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ ss::future<> controller::wire_up() {
.then([this] {
return _partition_allocator.start_single(
std::ref(_members_table),
std::ref(_feature_table),
config::shard_local_cfg().topic_memory_per_partition.bind(),
config::shard_local_cfg().topic_fds_per_partition.bind(),
config::shard_local_cfg().topic_partitions_per_shard.bind(),
Expand Down Expand Up @@ -378,6 +379,7 @@ ss::future<> controller::start(
std::ref(_members_table),
std::ref(_partition_manager),
std::ref(_shard_table),
std::ref(_shard_balancer),
ss::sharded_parameter(
[this] { return std::ref(_plugin_table.local()); }),
ss::sharded_parameter(
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class controller {
return _partition_balancer;
}

ss::sharded<shard_balancer>& get_shard_balancer() {
return _shard_balancer;
}

ss::sharded<ss::abort_source>& get_abort_source() { return _as; }

ss::sharded<storage::api>& get_storage() { return _storage; }
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/controller.json
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@
"name": "delete_topics",
"input_type": "delete_topics_request",
"output_type": "delete_topics_reply"
},
{
"name": "set_partition_shard",
"input_type": "set_partition_shard_request",
"output_type": "set_partition_shard_reply"
}
]
}
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ enum class errc : int16_t {
topic_invalid_partitions_decreased,
producer_ids_vcluster_limit_exceeded,
validation_of_recovery_topic_failed,
replica_does_not_exist,
};
struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }
Expand Down Expand Up @@ -259,6 +260,8 @@ struct errc_category final : public std::error_category {
return "To many vclusters registered in producer state cache";
case errc::validation_of_recovery_topic_failed:
return "Validation of recovery topic failed";
case errc::replica_does_not_exist:
return "Partition replica does not exist";
}
return "cluster::errc::unknown";
}
Expand Down
64 changes: 29 additions & 35 deletions src/v/cluster/scheduling/allocation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,9 @@ bool allocation_node::is_full(
return !is_internal_topic && count > _max_capacity;
}

ss::shard_id
allocation_node::allocate(const partition_allocation_domain domain) {
ss::shard_id allocation_node::allocate_shard() {
auto it = std::min_element(_weights.begin(), _weights.end());
(*it)++; // increment the weights
_allocated_partitions++;
++_allocated_domain_partitions[domain];
_final_partitions++;
++_final_domain_partitions[domain];
const ss::shard_id core = std::distance(_weights.begin(), it);
vlog(
clusterlog.trace,
Expand All @@ -94,57 +89,56 @@ allocation_node::allocate(const partition_allocation_domain domain) {
return core;
}

void allocation_node::deallocate_on(
ss::shard_id core, const partition_allocation_domain domain) {
void allocation_node::add_allocation(partition_allocation_domain domain) {
_allocated_partitions++;
++_allocated_domain_partitions[domain];
}

void allocation_node::add_allocation(ss::shard_id core) {
vassert(
core < _weights.size(),
"Tried to deallocate a non-existing core:{} - {}",
"Tried to allocate a non-existing core:{} - {}",
core,
*this);
_weights[core]++;
}

void allocation_node::remove_allocation(partition_allocation_domain domain) {
vassert(
_allocated_partitions > allocation_capacity{0} && _weights[core] > 0,
"unable to deallocate partition from core {} at node {}",
core,
_allocated_partitions > allocation_capacity{0},
"unable to deallocate partition at node {}",
*this);

allocation_capacity& domain_partitions
= _allocated_domain_partitions[domain];
vassert(
domain_partitions > allocation_capacity{0}
&& domain_partitions <= _allocated_partitions,
"Unable to deallocate partition from core {} in domain {} at node {}",
core,
"Unable to deallocate partition in domain {} at node {}",
domain,
*this);
--domain_partitions;

_allocated_partitions--;
_weights[core]--;
vlog(
clusterlog.trace,
"deallocation [node: {}, core: {}], total allocated: {}",
_id,
core,
_allocated_partitions);
--domain_partitions;
}

void allocation_node::allocate_on(
ss::shard_id core, const partition_allocation_domain domain) {
void allocation_node::remove_allocation(ss::shard_id core) {
vassert(
core < _weights.size(),
"Tried to allocate a non-existing core:{} - {}",
core < _weights.size() && _weights[core] > 0,
"unable to deallocate partition from core {} at node {}",
core,
*this);
_weights[core]--;
}

_weights[core]++;
_allocated_partitions++;
++_allocated_domain_partitions[domain];
vlog(
clusterlog.trace,
"allocation [node: {}, core: {}], total allocated: {}",
_id,
core,
_allocated_partitions);
void allocation_node::add_final_count(partition_allocation_domain domain) {
++_final_partitions;
++_final_domain_partitions[domain];
}

void allocation_node::remove_final_count(partition_allocation_domain domain) {
--_final_partitions;
--_final_domain_partitions[domain];
}

void allocation_node::update_core_count(uint32_t core_count) {
Expand Down
11 changes: 8 additions & 3 deletions src/v/cluster/scheduling/allocation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,18 @@ class allocation_node {
return _allocated_partitions == allocation_capacity{0};
}
bool is_full(const model::ntp&, bool will_add_allocation) const;
ss::shard_id allocate(partition_allocation_domain);

private:
friend allocation_state;

void deallocate_on(ss::shard_id core, partition_allocation_domain);
void allocate_on(ss::shard_id core, partition_allocation_domain);
ss::shard_id allocate_shard();

void add_allocation(partition_allocation_domain);
void add_allocation(ss::shard_id core);
void remove_allocation(partition_allocation_domain);
void remove_allocation(ss::shard_id core);
void add_final_count(partition_allocation_domain);
void remove_final_count(partition_allocation_domain);

model::node_id _id;
/// each index is a CPU. A weight is roughly the number of assignments
Expand Down
49 changes: 42 additions & 7 deletions src/v/cluster/scheduling/allocation_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "base/oncore.h"
#include "cluster/logger.h"
#include "features/feature_table.h"
#include "ssx/sformat.h"

namespace cluster {
Expand Down Expand Up @@ -153,12 +154,26 @@ bool allocation_state::is_empty(model::node_id id) const {
return it->second->empty();
}

bool allocation_state::node_local_core_assignment_enabled() const {
return _feature_table.is_active(
features::feature::node_local_core_assignment);
}

void allocation_state::add_allocation(
const model::broker_shard& replica,
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
it->second->allocate_on(replica.shard, domain);
it->second->add_allocation(domain);
if (!node_local_core_assignment_enabled()) {
it->second->add_allocation(replica.shard);
}
vlog(
clusterlog.trace,
"add allocation [node: {}, core: {}], total allocated: {}",
replica.node_id,
replica.shard,
it->second->_allocated_partitions);
}
}

Expand All @@ -167,7 +182,16 @@ void allocation_state::remove_allocation(
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
it->second->deallocate_on(replica.shard, domain);
it->second->remove_allocation(domain);
if (!node_local_core_assignment_enabled()) {
it->second->remove_allocation(replica.shard);
}
vlog(
clusterlog.trace,
"remove allocation [node: {}, core: {}], total allocated: {}",
replica.node_id,
replica.shard,
it->second->_allocated_partitions);
}
}

Expand All @@ -177,16 +201,28 @@ uint32_t allocation_state::allocate(
auto it = _nodes.find(id);
vassert(
it != _nodes.end(), "allocated node with id {} have to be present", id);
return it->second->allocate(domain);

it->second->add_allocation(domain);
it->second->add_final_count(domain);
if (node_local_core_assignment_enabled()) {
#ifndef NDEBUG
// return invalid shard in debug mode so that we can spot places where
// we are still using it.
return 12312312;
#else
return 0;
#endif
} else {
return it->second->allocate_shard();
}
}

void allocation_state::add_final_count(
const model::broker_shard& replica,
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
++it->second->_final_partitions;
++it->second->_final_domain_partitions[domain];
it->second->add_final_count(domain);
}
}

Expand All @@ -195,8 +231,7 @@ void allocation_state::remove_final_count(
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
--it->second->_final_partitions;
--it->second->_final_domain_partitions[domain];
it->second->remove_final_count(domain);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster/scheduling/allocation_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "base/oncore.h"
#include "cluster/scheduling/allocation_node.h"
#include "features/fwd.h"
#include "model/metadata.h"

#include <seastar/core/weak_ptr.hh>
Expand All @@ -29,10 +30,12 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {
using underlying_t = absl::btree_map<model::node_id, node_ptr>;

allocation_state(
features::feature_table& feature_table,
config::binding<uint32_t> partitions_per_shard,
config::binding<uint32_t> partitions_reserve_shard0,
config::binding<std::vector<ss::sstring>> internal_kafka_topics)
: _partitions_per_shard(std::move(partitions_per_shard))
: _feature_table(feature_table)
, _partitions_per_shard(std::move(partitions_per_shard))
, _partitions_reserve_shard0(std::move(partitions_reserve_shard0))
, _internal_kafka_topics(std::move(internal_kafka_topics)) {}

Expand All @@ -50,6 +53,8 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {
const underlying_t& allocation_nodes() const { return _nodes; }
int16_t available_nodes() const;

bool node_local_core_assignment_enabled() const;

// Choose a shard for a replica and add the corresponding allocation.
// node_id is required to belong to an existing node.
uint32_t allocate(model::node_id id, partition_allocation_domain);
Expand Down Expand Up @@ -80,6 +85,7 @@ class allocation_state : public ss::weakly_referencable<allocation_state> {
*/
void verify_shard() const;

features::feature_table& _feature_table;
config::binding<uint32_t> _partitions_per_shard;
config::binding<uint32_t> _partitions_reserve_shard0;
config::binding<std::vector<ss::sstring>> _internal_kafka_topics;
Expand Down
9 changes: 8 additions & 1 deletion src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cluster/scheduling/types.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "features/feature_table.h"
#include "model/metadata.h"
#include "random/generators.h"
#include "ssx/async_algorithm.h"
Expand All @@ -44,15 +45,20 @@ namespace cluster {

partition_allocator::partition_allocator(
ss::sharded<members_table>& members,
ss::sharded<features::feature_table>& feature_table,
config::binding<std::optional<size_t>> memory_per_partition,
config::binding<std::optional<int32_t>> fds_per_partition,
config::binding<uint32_t> partitions_per_shard,
config::binding<uint32_t> partitions_reserve_shard0,
config::binding<std::vector<ss::sstring>> internal_kafka_topics,
config::binding<bool> enable_rack_awareness)
: _state(std::make_unique<allocation_state>(
partitions_per_shard, partitions_reserve_shard0, internal_kafka_topics))
feature_table.local(),
partitions_per_shard,
partitions_reserve_shard0,
internal_kafka_topics))
, _members(members)
, _feature_table(feature_table.local())
, _memory_per_partition(std::move(memory_per_partition))
, _fds_per_partition(std::move(fds_per_partition))
, _partitions_per_shard(std::move(partitions_per_shard))
Expand Down Expand Up @@ -504,6 +510,7 @@ void partition_allocator::remove_final_counts(
ss::future<>
partition_allocator::apply_snapshot(const controller_snapshot& snap) {
auto new_state = std::make_unique<allocation_state>(
_feature_table,
_partitions_per_shard,
_partitions_reserve_shard0,
_internal_kafka_topics);
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "cluster/scheduling/allocation_strategy.h"
#include "cluster/scheduling/types.h"
#include "config/property.h"
#include "features/fwd.h"

namespace cluster {

Expand All @@ -31,6 +32,7 @@ class partition_allocator {
static constexpr ss::shard_id shard = 0;
partition_allocator(
ss::sharded<members_table>&,
ss::sharded<features::feature_table>&,
config::binding<std::optional<size_t>> memory_per_partition,
config::binding<std::optional<int32_t>> fds_per_partition,
config::binding<uint32_t> partitions_per_shard,
Expand Down Expand Up @@ -148,6 +150,7 @@ class partition_allocator {
std::unique_ptr<allocation_state> _state;
allocation_strategy _allocation_strategy;
ss::sharded<members_table>& _members;
features::feature_table& _feature_table;

config::binding<std::optional<size_t>> _memory_per_partition;
config::binding<std::optional<int32_t>> _fds_per_partition;
Expand Down
8 changes: 8 additions & 0 deletions src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,4 +816,12 @@ service::delete_topics(delete_topics_request req, rpc::streaming_context&) {
co_return delete_topics_reply{.results = std::move(result)};
}

ss::future<set_partition_shard_reply> service::set_partition_shard(
set_partition_shard_request req, rpc::streaming_context&) {
co_await ss::coroutine::switch_to(get_scheduling_group());
auto ec = co_await _topics_frontend.local().set_local_partition_shard(
req.ntp, req.shard);
co_return set_partition_shard_reply{.ec = ec};
}

} // namespace cluster
3 changes: 3 additions & 0 deletions src/v/cluster/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ class service : public controller_service {
ss::future<delete_topics_reply>
delete_topics(delete_topics_request, rpc::streaming_context&) final;

ss::future<set_partition_shard_reply> set_partition_shard(
set_partition_shard_request, rpc::streaming_context&) final;

private:
static constexpr auto default_move_interruption_timeout = 10s;
std::pair<std::vector<model::topic_metadata>, topic_configuration_vector>
Expand Down
Loading

0 comments on commit dcbba8c

Please sign in to comment.