Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] [CORE-7750] Creating topic with huge number of partitions leads to segfault #24232

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ allocation_constraints partition_allocator::default_constraints() {
* with partitions that cannot be re-accommodated on smaller peers).
*/
std::error_code partition_allocator::check_cluster_limits(
allocation_request const& request) const {
const uint64_t new_partitions_replicas_requested) const {
if (_members.local().nodes().empty()) {
// Empty members table, we're probably running in a unit test
return errc::success;
Expand All @@ -121,16 +121,8 @@ std::error_code partition_allocator::check_cluster_limits(
existent_partitions += uint64_t(i.second->allocated_partitions());
}

// Partition-replicas requested
uint64_t create_count{0};
for (const auto& i : request.partitions) {
if (i.replication_factor > i.existing_replicas.size()) {
create_count += uint64_t(
i.replication_factor - i.existing_replicas.size());
}
}

uint64_t proposed_total_partitions = existent_partitions + create_count;
uint64_t proposed_total_partitions = existent_partitions
+ new_partitions_replicas_requested;

// Gather information about system-wide resource sizes
uint32_t min_core_count = 0;
Expand Down Expand Up @@ -183,7 +175,7 @@ std::error_code partition_allocator::check_cluster_limits(
clusterlog.warn,
"Refusing to create {} partitions as total partition count {} would "
"exceed core limit {}",
create_count,
new_partitions_replicas_requested,
proposed_total_partitions,
effective_cpu_count * _partitions_per_shard());
return errc::topic_invalid_partitions_core_limit;
Expand All @@ -204,7 +196,7 @@ std::error_code partition_allocator::check_cluster_limits(
"Refusing to create {} new partitions as total partition count "
"{} "
"would exceed memory limit {}",
create_count,
new_partitions_replicas_requested,
proposed_total_partitions,
memory_limit);
return errc::topic_invalid_partitions_memory_limit;
Expand All @@ -226,7 +218,7 @@ std::error_code partition_allocator::check_cluster_limits(
clusterlog.warn,
"Refusing to create {} partitions as total partition "
"count {} would exceed FD limit {}",
create_count,
new_partitions_replicas_requested,
proposed_total_partitions,
fds_limit);
return errc::topic_invalid_partitions_fd_limit;
Expand All @@ -241,18 +233,59 @@ std::error_code partition_allocator::check_cluster_limits(
return errc::success;
}

ss::future<result<allocation_units::pointer>>
partition_allocator::allocate(simple_allocation_request simple_req) {
vlog(
clusterlog.trace,
"allocation request for {} partitions",
simple_req.additional_partitions);

const uint64_t create_count
= static_cast<uint64_t>(simple_req.additional_partitions)
* static_cast<uint64_t>(simple_req.replication_factor);
auto cluster_errc = check_cluster_limits(create_count);
if (cluster_errc) {
co_return cluster_errc;
}

allocation_request req(
simple_req.tp_ns, get_allocation_domain(simple_req.tp_ns));
req.partitions.reserve(simple_req.additional_partitions);
for (auto p = 0; p < simple_req.additional_partitions; ++p) {
req.partitions.emplace_back(
model::partition_id(p), simple_req.replication_factor);
}
req.existing_replica_counts = std::move(simple_req.existing_replica_counts);

co_return co_await do_allocate(std::move(req));
}

ss::future<result<allocation_units::pointer>>
partition_allocator::allocate(allocation_request request) {
vlog(
clusterlog.trace,
"allocation request for {} partitions",
request.partitions.size());

auto cluster_errc = check_cluster_limits(request);
// Partition-replicas requested
uint64_t create_count{0};
for (const auto& i : request.partitions) {
if (i.replication_factor > i.existing_replicas.size()) {
create_count += uint64_t(
i.replication_factor - i.existing_replicas.size());
}
}

auto cluster_errc = check_cluster_limits(create_count);
if (cluster_errc) {
co_return cluster_errc;
}

co_return co_await do_allocate(std::move(request));
}

ss::future<result<allocation_units::pointer>>
partition_allocator::do_allocate(allocation_request request) {
std::optional<node2count_t> node2count;
if (request.existing_replica_counts) {
node2count = std::move(*request.existing_replica_counts);
Expand Down
21 changes: 19 additions & 2 deletions src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ class partition_allocator {

// Replica placement APIs

/**
* Return an allocation_units object wrapping the result of the allocating
* the given allocation request, or an error if it was not possible.
*
* This overload of allocate will validate cluster limits upfront, before
* instantiating an allocation_request. This allows to validate very big
* allocation_requests, where the size of the request itself could be an
* issue in itself. E.g. a request for a billion partitions.
*/
ss::future<result<allocation_units::pointer>>
allocate(simple_allocation_request);

/**
* Return an allocation_units object wrapping the result of the allocating
* the given allocation request, or an error if it was not possible.
Expand Down Expand Up @@ -137,8 +149,13 @@ class partition_allocator {
ss::future<> apply_snapshot(const controller_snapshot&);

private:
std::error_code
check_cluster_limits(allocation_request const& request) const;
// new_partitions_replicas_requested represents the total number of
// partitions requested by a request. i.e. partitions * replicas requested.
std::error_code check_cluster_limits(
const uint64_t new_partitions_replicas_requested) const;

ss::future<result<allocation_units::pointer>>
do_allocate(allocation_request);

result<reallocation_step> do_allocate_replica(
allocated_partition&,
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,14 @@ std::ostream& operator<<(std::ostream& o, const allocation_request& req) {
o, "{{partion_constraints: {}, domain: {}}}", req.partitions, req.domain);
return o;
}
std::ostream&
operator<<(std::ostream& o, const simple_allocation_request& req) {
fmt::print(
o,
"{{topic: {}, additional_partitions: {}, replication_factor: {}}}",
req.tp_ns,
req.additional_partitions,
req.replication_factor);
return o;
}
} // namespace cluster
39 changes: 39 additions & 0 deletions src/v/cluster/scheduling/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "base/vassert.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>
Expand All @@ -24,6 +25,8 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/node_hash_set.h>

#include <optional>

namespace cluster {
class allocation_node;
class allocation_state;
Expand Down Expand Up @@ -368,4 +371,40 @@ struct allocation_request {
friend std::ostream& operator<<(std::ostream&, const allocation_request&);
};

/**
* The simple_allocation_request represents a simplified allocation_request
* that doesn't allow for partition-specific requirements. The memory required
* for a simple_allocation_request objects does not scale with the number of
* requested partitions.
*/
struct simple_allocation_request {
simple_allocation_request() = delete;
simple_allocation_request(const simple_allocation_request&) = delete;
simple_allocation_request(simple_allocation_request&&) = default;
simple_allocation_request& operator=(const simple_allocation_request&)
= delete;
simple_allocation_request& operator=(simple_allocation_request&&) = default;
~simple_allocation_request() = default;

simple_allocation_request(
model::topic_namespace tp_ns,
int32_t additional_partitions,
int16_t replication_factor,
std::optional<node2count_t> existing_replac_counts = std::nullopt)
: tp_ns{std::move(tp_ns)}
, additional_partitions{additional_partitions}
, replication_factor{replication_factor}
, existing_replica_counts{std::move(existing_replac_counts)} {}

model::topic_namespace tp_ns;
int32_t additional_partitions{0};
int16_t replication_factor{0};
// if present, new partitions will be allocated using topic-aware counts
// objective.
std::optional<node2count_t> existing_replica_counts;

friend std::ostream&
operator<<(std::ostream&, const simple_allocation_request&);
};

} // namespace cluster
5 changes: 5 additions & 0 deletions src/v/cluster/tests/partition_allocator_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ struct partition_allocator_fixture {
});
}

cluster::simple_allocation_request make_simple_allocation_request(
int32_t partitions, int16_t replication_factor) {
return {tn, partitions, replication_factor};
}

cluster::allocation_request
make_allocation_request(int partitions, uint16_t replication_factor) {
return make_allocation_request(tn, partitions, replication_factor);
Expand Down
99 changes: 80 additions & 19 deletions src/v/cluster/tests/partition_allocator_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,75 @@ FIXTURE_TEST(allocation_over_core_capacity, partition_allocator_fixture) {
const auto partition_count
= partition_allocator_fixture::partitions_per_shard + 1;
register_node(0, 1);
auto result
= allocator().allocate(make_allocation_request(partition_count, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_core_limit));

{
auto result = allocator()
.allocate(make_allocation_request(partition_count, 1))
.get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_core_limit));
}

{
auto result = allocator()
.allocate(
make_simple_allocation_request(partition_count, 1))
.get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_core_limit));
}
}

FIXTURE_TEST(
allocation_over_memory_capacity, partition_allocator_memory_limited_fixture) {
register_node(0, 1);
auto result = allocator().allocate(make_allocation_request(1, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_memory_limit));

{
auto result = allocator().allocate(make_allocation_request(1, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_memory_limit));
}
{
auto result
= allocator().allocate(make_simple_allocation_request(1, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_memory_limit));
}
}

FIXTURE_TEST(
allocation_over_fds_capacity, partition_allocator_fd_limited_fixture) {
register_node(0, 1);
auto result = allocator().allocate(make_allocation_request(1, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_fd_limit));

{
auto result = allocator().allocate(make_allocation_request(1, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_fd_limit));
}
{
auto result
= allocator().allocate(make_simple_allocation_request(1, 1)).get();
BOOST_REQUIRE(result.has_error());
BOOST_REQUIRE_EQUAL(
result.assume_error(),
cluster::make_error_code(
cluster::errc::topic_invalid_partitions_fd_limit));
}
}

FIXTURE_TEST(allocation_over_capacity, partition_allocator_fixture) {
Expand Down Expand Up @@ -342,6 +382,27 @@ FIXTURE_TEST(allocation_units_test, partition_allocator_fixture) {
// we do not decrement the highest raft group
BOOST_REQUIRE_EQUAL(allocator().state().last_group_id()(), 10);
}
FIXTURE_TEST(allocation_units_test_raw_req, partition_allocator_fixture) {
register_node(1, 10);
register_node(2, 11);
register_node(3, 12);
// just fill up the cluster partially

{
auto allocs = allocator()
.allocate(make_simple_allocation_request(10, 3))
.get()
.value();
BOOST_REQUIRE_EQUAL(allocs->get_assignments().size(), 10);
BOOST_REQUIRE_EQUAL(
allocated_nodes_count(allocs->get_assignments()), 3 * 10);
}

BOOST_REQUIRE(all_nodes_empty());

// we do not decrement the highest raft group
BOOST_REQUIRE_EQUAL(allocator().state().last_group_id()(), 10);
}
FIXTURE_TEST(decommission_node, partition_allocator_fixture) {
register_node(0, 32);
register_node(1, 64);
Expand Down
Loading