Skip to content

Commit

Permalink
Merge pull request #22768 from mmaslankaprv/backport-21517-v24.1.x-88
Browse files Browse the repository at this point in the history
[v24.1.x] CORE-5686 Decouple different raft group shutdown sequences
  • Loading branch information
mmaslankaprv authored Aug 8, 2024
2 parents 9144399 + adf2c8c commit 976f936
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 35 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
"manager",
ntp));
}
vlog(clusterlog.debug, "removing partition {}", ntp);
partition_shutdown_state shutdown_state(partition);
_partitions_shutting_down.push_back(shutdown_state);
auto group_id = partition->group();
Expand Down
65 changes: 31 additions & 34 deletions src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,18 @@ ss::future<ss::lw_shared_ptr<raft::consensus>> group_manager::create_group(
_feature_table,
_is_ready ? std::nullopt : std::make_optional(min_voter_priority),
keep_snapshotted_log);
return _groups_mutex.with([this, raft = std::move(raft)] {
return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
if (_is_ready) {
// Check _is_ready flag again to guard against the case when
// set_ready() was called after we created this consensus
// instance but before we insert it into the _groups
// collection.
raft->reset_node_priority();
}
_groups.push_back(raft);
return raft;
});

return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
if (_is_ready) {
// Check _is_ready flag again to guard against the case when
// set_ready() was called after we created this consensus
// instance but before we insert it into the _groups
// collection.
raft->reset_node_priority();
}
_groups.push_back(raft);
return raft;
});
});
}
Expand Down Expand Up @@ -189,30 +188,28 @@ raft::group_configuration group_manager::create_initial_configuration(
}

ss::future<> group_manager::remove(ss::lw_shared_ptr<raft::consensus> c) {
return _groups_mutex.with([this, c = std::move(c)] {
return c->stop()
.then([c] { return c->remove_persistent_state(); })
.then([this, id = c->group()] {
return _heartbeats.deregister_group(id);
})
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
});
return do_shutdown(std::move(c), true).discard_result();
}

ss::future<> group_manager::shutdown(ss::lw_shared_ptr<raft::consensus> c) {
return _groups_mutex.with([this, c = std::move(c)] {
return c->stop()
.then([this, id = c->group()] {
return _heartbeats.deregister_group(id);
})
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
});
return do_shutdown(std::move(c), false);
}

ss::future<> group_manager::do_shutdown(
ss::lw_shared_ptr<raft::consensus> c, bool remove_persistent_state) {
const auto group_id = c->group();
co_await c->stop();
if (remove_persistent_state) {
co_await c->remove_persistent_state();
}
co_await _heartbeats.deregister_group(group_id);
auto it = std::find(_groups.begin(), _groups.end(), c);
vassert(
it != _groups.end(),
"A consensus instance with group id: {} that is requested to be removed "
"must be managed by the manager",
group_id);
_groups.erase(it);
}

void group_manager::trigger_leadership_notification(
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ class group_manager {
void trigger_config_update_notification();
void collect_learner_metrics();

ss::future<>
do_shutdown(ss::lw_shared_ptr<consensus>, bool remove_persistent_state);

raft::group_configuration create_initial_configuration(
std::vector<model::broker>, model::revision_id) const;
mutex _groups_mutex{"group_manager"};
model::node_id _self;
ss::scheduling_group _raft_sg;
raft::consensus_client_protocol _client;
Expand Down

0 comments on commit 976f936

Please sign in to comment.