Skip to content

Commit

Permalink
r/consensus: handle forced configuration changes without appends
Browse files Browse the repository at this point in the history
Changed the way how Redpanda raft handles force reconfigurations.
Instead of appending configuration to the log we keep a volatile in
memory override for the configuration that was forced. The override is
dropped whenever a node receives different configuration. This approach
allow us to avoid discrepancies as not data are appended to the log
without the active leader.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed May 27, 2024
1 parent 636a19e commit 01bef11
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
22 changes: 22 additions & 0 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ configuration_manager::add(std::vector<offset_configuration> configurations) {
// handling backward compatibility i.e. revisionless configurations
co.cfg.maybe_set_initial_revision(_initial_revision);

reset_override(co.cfg.revision_id());
add_configuration(co.offset, std::move(co.cfg));
_highest_known_offset = std::max(_highest_known_offset, co.offset);
}
Expand Down Expand Up @@ -211,9 +212,30 @@ const group_configuration& configuration_manager::get_latest() const {
vassert(
!_configurations.empty(),
"Configuration manager should always have at least one configuration");
if (_configuration_force_override) [[unlikely]] {
return *_configuration_force_override;
}

return _configurations.rbegin()->second.cfg;
}

void configuration_manager::set_override(group_configuration cfg) {
vlog(_ctxlog.info, "Setting configuration override to {}", cfg);
_configuration_force_override = std::make_unique<group_configuration>(
std::move(cfg));
}

void configuration_manager::reset_override(
model::revision_id added_configuration_revision) {
if (
_configuration_force_override
&& _configuration_force_override->revision_id()
<= added_configuration_revision) [[unlikely]] {
vlog(_ctxlog.info, "Resetting configuration override");
_configuration_force_override.reset();
}
};

model::offset configuration_manager::get_latest_offset() const {
vassert(
!_configurations.empty(),
Expand Down
20 changes: 20 additions & 0 deletions src/v/raft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "base/units.h"
#include "group_configuration.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "raft/consensus_utils.h"
Expand Down Expand Up @@ -167,10 +168,27 @@ class configuration_manager {

ss::future<> adjust_configuration_idx(configuration_idx);

/**
* Sets a forced override for current configuration. The override is active
* and returned as latest configuration until it is cleared by adding a new
* configuration to group configuration manage.
*
* @param cfg the configuration to override with
*/
void set_override(group_configuration);

/**
* Checks if configuration override is active
*/
bool has_configuration_override() const {
return _configuration_force_override != nullptr;
}

friend std::ostream&
operator<<(std::ostream&, const configuration_manager&);

private:
void reset_override(model::revision_id);
ss::future<> store_configurations();
ss::future<> store_highest_known_offset();
bytes configurations_map_key() const {
Expand Down Expand Up @@ -219,5 +237,7 @@ class configuration_manager {
model::revision_id _initial_revision{};
ctx_log& _ctxlog;
configuration_idx _next_index{0};
std::unique_ptr<group_configuration> _configuration_force_override
= nullptr;
};
} // namespace raft
50 changes: 13 additions & 37 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,36 +1321,21 @@ consensus::abort_configuration_change(model::revision_id revision) {
if (latest_cfg.revision_id() > revision) {
co_return errc::invalid_configuration_update;
}

auto new_cfg = latest_cfg;
new_cfg.abort_configuration_change(revision);

auto batches = details::serialize_configuration_as_batches(
std::move(new_cfg));
for (auto& b : batches) {
b.set_term(_term);
};
/**
* Aborting configuration change is an operation that may lead to data loss.
* It must be possible to abort configuration change even if there is no
* leader elected. We simply append new configuration to each of the
* replicas log. If new leader will be elected using new configuration it
* will eventually propagate valid configuration to all the followers.
*/
auto append_result = co_await disk_append(
model::make_memory_record_batch_reader(std::move(batches)),
update_last_quorum_index::yes);
vlog(
_ctxlog.info,
"appended reconfiguration aborting configuration at offset {}",
append_result.base_offset);
// flush log as all configuration changes must eventually be committed.
co_await flush_log();
// if current node is a leader make sure we will try to update committed
// index, it may be required for single participant raft groups
if (is_leader()) {
maybe_update_majority_replicated_index();
maybe_update_leader_commit_idx();
}
update_follower_stats(new_cfg);
_configuration_manager.set_override(std::move(new_cfg));
do_step_down("reconfiguration-aborted");

co_return errc::success;
}

Expand All @@ -1372,22 +1357,10 @@ ss::future<std::error_code> consensus::force_replace_configuration_locally(
new_cfg.set_version(group_configuration::v_6);
}
vlog(_ctxlog.info, "Force replacing configuration with: {}", new_cfg);
auto batches = details::serialize_configuration_as_batches(
std::move(new_cfg));
for (auto& b : batches) {
b.set_term(_term);
};

auto result = co_await disk_append(
model::make_memory_record_batch_reader(std::move(batches)),
update_last_quorum_index::yes);
vlog(
_ctxlog.debug,
"appended reconfiguration to force update replica "
"set at "
"offset {}",
result.base_offset);
co_await flush_log();
update_follower_stats(new_cfg);
_configuration_manager.set_override(std::move(new_cfg));
do_step_down("forced-reconfiguration");

} catch (const ss::broken_semaphore&) {
co_return errc::shutting_down;
Expand Down Expand Up @@ -2863,8 +2836,11 @@ ss::future<storage::append_result> consensus::disk_append(
auto f = ss::now();
if (!configurations.empty()) {
// we can use latest configuration to update follower stats
update_follower_stats(configurations.back().cfg);
f = _configuration_manager.add(std::move(configurations));
f = _configuration_manager.add(std::move(configurations))
.then([this] {
update_follower_stats(
_configuration_manager.get_latest());
});
}

return f.then([this, ret = ret] {
Expand Down

0 comments on commit 01bef11

Please sign in to comment.