Skip to content

Commit

Permalink
partition_balancer: propagate an abort source into planner
Browse files Browse the repository at this point in the history
.. to make the planner tick abortable.
  • Loading branch information
bharathv committed Jun 12, 2023
1 parent 34daab3 commit ba57559
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 36 deletions.
37 changes: 30 additions & 7 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ namespace cluster {
static constexpr std::chrono::seconds controller_stm_sync_timeout = 10s;
static constexpr std::chrono::seconds add_move_cmd_timeout = 10s;

class balancer_tick_aborted_exception final : public std::runtime_error {
public:
explicit balancer_tick_aborted_exception(const std::string& msg)
: std::runtime_error(msg) {}
};

partition_balancer_backend::partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>& controller_stm,
Expand Down Expand Up @@ -138,12 +144,21 @@ void partition_balancer_backend::on_topic_table_update() {
}

void partition_balancer_backend::tick() {
ssx::background = ssx::spawn_with_gate_then(_gate, [this] {
return do_tick().finally(
[this] { maybe_rearm_timer(); });
}).handle_exception([](const std::exception_ptr& e) {
vlog(clusterlog.warn, "tick error: {}", e);
});
ssx::background
= ssx::spawn_with_gate_then(
_gate,
[this] {
return do_tick().finally([this] {
_tick_in_progress = {};
maybe_rearm_timer();
});
})
.handle_exception_type([](balancer_tick_aborted_exception& e) {
vlog(clusterlog.info, "tick aborted, reason: {}", e.what());
})
.handle_exception([](const std::exception_ptr& e) {
vlog(clusterlog.warn, "tick error: {}", e);
});
}

ss::future<> partition_balancer_backend::stop() {
Expand All @@ -152,6 +167,10 @@ ss::future<> partition_balancer_backend::stop() {
_state.members().unregister_members_updated_notification(_member_updates);
_timer.cancel();
_lock.broken();
if (_tick_in_progress) {
_tick_in_progress->request_abort_ex(
balancer_tick_aborted_exception{"shutting down"});
}
return _gate.close();
}

Expand All @@ -176,6 +195,8 @@ ss::future<> partition_balancer_backend::do_tick() {
co_return;
}

_tick_in_progress = ss::abort_source{};

auto health_report = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
Expand Down Expand Up @@ -213,7 +234,7 @@ ss::future<> partition_balancer_backend::do_tick() {
.node_responsiveness_timeout = node_responsiveness_timeout},
_state,
_partition_allocator)
.plan_actions(health_report.value());
.plan_actions(health_report.value(), _tick_in_progress.value());

_last_leader_term = _raft0->term();
_last_tick_time = clock_t::now();
Expand Down Expand Up @@ -252,6 +273,7 @@ ss::future<> partition_balancer_backend::do_tick() {

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) {
_tick_in_progress->check();
auto f = _topics_frontend.cancel_moving_partition_replicas(
ntp,
model::timeout_clock::now() + add_move_cmd_timeout,
Expand All @@ -272,6 +294,7 @@ ss::future<> partition_balancer_backend::do_tick() {
plan_data.reassignments,
32,
[this, current_term](ntp_reassignment& reassignment) {
_tick_in_progress->check();
auto f = _topics_frontend.move_partition_replicas(
reassignment.ntp,
reassignment.allocated.replicas(),
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/partition_balancer_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class partition_balancer_backend {
ss::timer<clock_t> _timer;
notification_id_type _topic_table_updates;
notification_id_type _member_updates;
std::optional<ss::abort_source> _tick_in_progress;
};

} // namespace cluster
16 changes: 10 additions & 6 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,17 @@ class partition_balancer_planner::request_context {
}
}

ss::future<> maybe_yield() { co_await ss::coroutine::maybe_yield(); }
ss::future<> maybe_yield() {
co_await ss::coroutine::maybe_yield();
_as.check();
}

private:
friend class partition_balancer_planner;

request_context(partition_balancer_planner& parent)
: _parent(parent) {}
request_context(partition_balancer_planner& parent, ss::abort_source& as)
: _parent(parent)
, _as(as) {}

bool all_reports_received() const;

Expand All @@ -128,7 +132,6 @@ class partition_balancer_planner::request_context {
// returns true if the failure can be logged
bool increment_failure_count();

private:
partition_balancer_planner& _parent;
absl::node_hash_map<model::ntp, size_t> _ntp2size;
absl::node_hash_map<model::ntp, absl::flat_hash_map<model::node_id, size_t>>
Expand All @@ -137,6 +140,7 @@ class partition_balancer_planner::request_context {
uint64_t _planned_moves_size_bytes = 0;
size_t _failed_actions_count = 0;
absl::node_hash_set<model::ntp> _cancellations;
ss::abort_source& _as;
};

void partition_balancer_planner::init_per_node_state(
Expand Down Expand Up @@ -1198,8 +1202,8 @@ void partition_balancer_planner::request_context::collect_actions(

ss::future<partition_balancer_planner::plan_data>
partition_balancer_planner::plan_actions(
const cluster_health_report& health_report) {
request_context ctx(*this);
const cluster_health_report& health_report, ss::abort_source& as) {
request_context ctx(*this, as);
plan_data result;

init_per_node_state(health_report, ctx, result);
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class partition_balancer_planner {
status status = status::empty;
};

ss::future<plan_data> plan_actions(const cluster_health_report&);
ss::future<plan_data>
plan_actions(const cluster_health_report&, ss::abort_source&);

private:
class request_context;
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/tests/partition_balancer_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ PERF_TEST_C(partition_balancer_planner_fixture, unavailable_nodes) {

auto planner = make_planner();

abort_source as;
perf_tests::start_measuring_time();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();
perf_tests::stop_measuring_time();

const auto& reassignments = plan_data.reassignments;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,5 @@ struct partition_balancer_planner_fixture {

controller_workers workers;
int last_node_idx{};
ss::abort_source as;
};
42 changes: 21 additions & 21 deletions src/v/cluster/tests/partition_balancer_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ FIXTURE_TEST(test_stable, partition_balancer_planner_fixture) {
populate_node_status_table().get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();
check_violations(plan_data, {}, {});
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0);
}
Expand Down Expand Up @@ -102,7 +102,7 @@ FIXTURE_TEST(test_node_down, partition_balancer_planner_fixture) {
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});

Expand Down Expand Up @@ -140,7 +140,7 @@ FIXTURE_TEST(test_no_quorum_for_partition, partition_balancer_planner_fixture) {
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0);
}

Expand Down Expand Up @@ -174,7 +174,7 @@ FIXTURE_TEST(
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});

Expand Down Expand Up @@ -213,7 +213,7 @@ FIXTURE_TEST(
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});

Expand Down Expand Up @@ -252,7 +252,7 @@ FIXTURE_TEST(
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, full_nodes);

Expand Down Expand Up @@ -284,7 +284,7 @@ FIXTURE_TEST(test_move_from_full_node, partition_balancer_planner_fixture) {
populate_node_status_table().get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, {}, full_nodes);

Expand Down Expand Up @@ -325,7 +325,7 @@ FIXTURE_TEST(
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});

Expand Down Expand Up @@ -373,7 +373,7 @@ FIXTURE_TEST(
populate_node_status_table().get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();
check_violations(plan_data, {}, full_nodes);

const auto& reassignments = plan_data.reassignments;
Expand Down Expand Up @@ -420,7 +420,7 @@ FIXTURE_TEST(
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();
check_violations(plan_data, unavailable_nodes, full_nodes);

const auto& reassignments = plan_data.reassignments;
Expand Down Expand Up @@ -471,7 +471,7 @@ FIXTURE_TEST(test_move_part_of_replicas, partition_balancer_planner_fixture) {
hr.node_reports[2].local_state.data_disk.free -= 2_MiB;

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, {}, full_nodes);

Expand Down Expand Up @@ -529,7 +529,7 @@ FIXTURE_TEST(
}

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, {}, full_nodes);

Expand Down Expand Up @@ -579,7 +579,7 @@ FIXTURE_TEST(test_lot_of_partitions, partition_balancer_planner_fixture) {
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();
check_violations(plan_data, unavailable_nodes, {});

const auto& reassignments = plan_data.reassignments;
Expand Down Expand Up @@ -638,7 +638,7 @@ FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) {
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto planner_result = planner.plan_actions(hr).get();
auto planner_result = planner.plan_actions(hr, as).get();

BOOST_REQUIRE_EQUAL(planner_result.reassignments.size(), 1);

Expand All @@ -660,7 +660,7 @@ FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) {
unavailable_nodes = {0, 3};
populate_node_status_table(unavailable_nodes).get();

planner_result = planner.plan_actions(hr).get();
planner_result = planner.plan_actions(hr, as).get();
BOOST_REQUIRE(planner_result.reassignments.size() == 0);
BOOST_REQUIRE(planner_result.cancellations.size() == 1);
BOOST_REQUIRE(planner_result.cancellations.front() == ntp);
Expand Down Expand Up @@ -699,7 +699,7 @@ FIXTURE_TEST(test_rack_awareness, partition_balancer_planner_fixture) {
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});

Expand Down Expand Up @@ -734,7 +734,7 @@ FIXTURE_TEST(
set_maintenance_mode(model::node_id{3});

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0);
Expand Down Expand Up @@ -764,7 +764,7 @@ FIXTURE_TEST(
set_decommissioning(model::node_id{3});

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0);
Expand Down Expand Up @@ -794,7 +794,7 @@ FIXTURE_TEST(
set_decommissioning(model::node_id{0});

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, unavailable_nodes, {});
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 1);
Expand Down Expand Up @@ -866,7 +866,7 @@ FIXTURE_TEST(test_rack_awareness_repair, partition_balancer_planner_fixture) {
populate_node_status_table().get();

auto planner = make_planner();
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, {}, {});
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 2);
Expand Down Expand Up @@ -897,7 +897,7 @@ FIXTURE_TEST(balancing_modes, partition_balancer_planner_fixture) {
populate_node_status_table(unavailable_nodes).get();

auto planner = make_planner(model::partition_autobalancing_mode::node_add);
auto plan_data = planner.plan_actions(hr).get();
auto plan_data = planner.plan_actions(hr, as).get();

check_violations(plan_data, {}, {});
BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0);
Expand Down

0 comments on commit ba57559

Please sign in to comment.