Skip to content

Commit

Permalink
Merge pull request #18835 from vbotbuildovich/backport-pr-18153-v24.1…
Browse files Browse the repository at this point in the history
….x-627

[v24.1.x] Fixed possible log discrepancy when using forced reconfiguration
  • Loading branch information
mmaslankaprv authored Jun 6, 2024
2 parents 53e6eab + b987f2a commit 9b0a80c
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 42 deletions.
9 changes: 9 additions & 0 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ std::error_code check_configuration_update(
partition->ntp());
return errc::partition_configuration_in_joint_mode;
}

if (includes_self && partition->raft()->has_configuration_override()) {
vlog(
clusterlog.trace,
"[{}] contains current node and there is configuration override "
"active",
partition->ntp());
return errc::partition_configuration_in_joint_mode;
}
/*
* if replica set is a leader it must have configuration committed i.e. it
* was successfully replicated to majority of followers.
Expand Down
22 changes: 22 additions & 0 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ configuration_manager::add(std::vector<offset_configuration> configurations) {
// handling backward compatibility i.e. revisionless configurations
co.cfg.maybe_set_initial_revision(_initial_revision);

reset_override(co.cfg.revision_id());
add_configuration(co.offset, std::move(co.cfg));
_highest_known_offset = std::max(_highest_known_offset, co.offset);
}
Expand Down Expand Up @@ -211,9 +212,30 @@ const group_configuration& configuration_manager::get_latest() const {
vassert(
!_configurations.empty(),
"Configuration manager should always have at least one configuration");
if (_configuration_force_override) [[unlikely]] {
return *_configuration_force_override;
}

return _configurations.rbegin()->second.cfg;
}

void configuration_manager::set_override(group_configuration cfg) {
vlog(_ctxlog.info, "Setting configuration override to {}", cfg);
_configuration_force_override = std::make_unique<group_configuration>(
std::move(cfg));
}

void configuration_manager::reset_override(
model::revision_id added_configuration_revision) {
if (
_configuration_force_override
&& _configuration_force_override->revision_id()
<= added_configuration_revision) [[unlikely]] {
vlog(_ctxlog.info, "Resetting configuration override");
_configuration_force_override.reset();
}
};

model::offset configuration_manager::get_latest_offset() const {
vassert(
!_configurations.empty(),
Expand Down
20 changes: 20 additions & 0 deletions src/v/raft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "base/units.h"
#include "group_configuration.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "raft/consensus_utils.h"
Expand Down Expand Up @@ -167,10 +168,27 @@ class configuration_manager {

ss::future<> adjust_configuration_idx(configuration_idx);

/**
* Sets a forced override for current configuration. The override is active
* and returned as latest configuration until it is cleared by adding a new
* configuration to group configuration manage.
*
* @param cfg the configuration to override with
*/
void set_override(group_configuration);

/**
* Checks if configuration override is active
*/
bool has_configuration_override() const {
return _configuration_force_override != nullptr;
}

friend std::ostream&
operator<<(std::ostream&, const configuration_manager&);

private:
void reset_override(model::revision_id);
ss::future<> store_configurations();
ss::future<> store_highest_known_offset();
bytes configurations_map_key() const {
Expand Down Expand Up @@ -219,5 +237,7 @@ class configuration_manager {
model::revision_id _initial_revision{};
ctx_log& _ctxlog;
configuration_idx _next_index{0};
std::unique_ptr<group_configuration> _configuration_force_override
= nullptr;
};
} // namespace raft
50 changes: 13 additions & 37 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,36 +1321,21 @@ consensus::abort_configuration_change(model::revision_id revision) {
if (latest_cfg.revision_id() > revision) {
co_return errc::invalid_configuration_update;
}

auto new_cfg = latest_cfg;
new_cfg.abort_configuration_change(revision);

auto batches = details::serialize_configuration_as_batches(
std::move(new_cfg));
for (auto& b : batches) {
b.set_term(_term);
};
/**
* Aborting configuration change is an operation that may lead to data loss.
* It must be possible to abort configuration change even if there is no
* leader elected. We simply append new configuration to each of the
* replicas log. If new leader will be elected using new configuration it
* will eventually propagate valid configuration to all the followers.
*/
auto append_result = co_await disk_append(
model::make_memory_record_batch_reader(std::move(batches)),
update_last_quorum_index::yes);
vlog(
_ctxlog.info,
"appended reconfiguration aborting configuration at offset {}",
append_result.base_offset);
// flush log as all configuration changes must eventually be committed.
co_await flush_log();
// if current node is a leader make sure we will try to update committed
// index, it may be required for single participant raft groups
if (is_leader()) {
maybe_update_majority_replicated_index();
maybe_update_leader_commit_idx();
}
update_follower_stats(new_cfg);
_configuration_manager.set_override(std::move(new_cfg));
do_step_down("reconfiguration-aborted");

co_return errc::success;
}

Expand All @@ -1372,22 +1357,10 @@ ss::future<std::error_code> consensus::force_replace_configuration_locally(
new_cfg.set_version(group_configuration::v_6);
}
vlog(_ctxlog.info, "Force replacing configuration with: {}", new_cfg);
auto batches = details::serialize_configuration_as_batches(
std::move(new_cfg));
for (auto& b : batches) {
b.set_term(_term);
};

auto result = co_await disk_append(
model::make_memory_record_batch_reader(std::move(batches)),
update_last_quorum_index::yes);
vlog(
_ctxlog.debug,
"appended reconfiguration to force update replica "
"set at "
"offset {}",
result.base_offset);
co_await flush_log();
update_follower_stats(new_cfg);
_configuration_manager.set_override(std::move(new_cfg));
do_step_down("forced-reconfiguration");

} catch (const ss::broken_semaphore&) {
co_return errc::shutting_down;
Expand Down Expand Up @@ -2863,8 +2836,11 @@ ss::future<storage::append_result> consensus::disk_append(
auto f = ss::now();
if (!configurations.empty()) {
// we can use latest configuration to update follower stats
update_follower_stats(configurations.back().cfg);
f = _configuration_manager.add(std::move(configurations));
f = _configuration_manager.add(std::move(configurations))
.then([this] {
update_follower_stats(
_configuration_manager.get_latest());
});
}

return f.then([this, ret = ret] {
Expand Down
4 changes: 4 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ class consensus {
return _replication_monitor;
}

bool has_configuration_override() const {
return _configuration_manager.has_configuration_override();
}

private:
friend replication_monitor;
friend replicate_entries_stm;
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ set(gsrcs
rp_test(
UNIT_TEST
GTEST
TIMEOUT 1000
TIMEOUT 1500
BINARY_NAME gtest_raft
SOURCES ${gsrcs}
LIBRARIES v::raft v::storage_test_utils v::model_test_utils v::features v::gtest_main
Expand Down
Loading

0 comments on commit 9b0a80c

Please sign in to comment.