Skip to content

Commit

Permalink
c/backend: use recovery when topic configure to use remote data
Browse files Browse the repository at this point in the history
When topic is configured to use remote data the data can be used when
force recovering partitions that lost majority. In this case instead of
creating an empty partition replica instance we customize arguments
passed into the `partition_manger::manage` method to enable remote
recovery of replica data.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 27, 2024
1 parent 313358a commit eb4c6ce
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
39 changes: 29 additions & 10 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "metrics/prometheus_sanitize.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "raft/fundamental.h"
#include "raft/group_configuration.h"
#include "ssx/event.h"
Expand Down Expand Up @@ -1159,12 +1160,14 @@ ss::future<result<ss::stop_iteration>> controller_backend::reconcile_ntp_step(
// Configuration will be replicate to the new replica
initial_replicas = {};
}

auto ec = co_await create_partition(
ntp,
group_id,
expected_log_revision.value(),
std::move(initial_replicas));
std::move(initial_replicas),
force_reconfiguration{
replicas_view.update
&& replicas_view.update->is_force_reconfiguration()});
if (ec) {
co_return ec;
}
Expand Down Expand Up @@ -1359,7 +1362,8 @@ ss::future<std::error_code> controller_backend::create_partition(
model::ntp ntp,
raft::group_id group_id,
model::revision_id log_revision,
replicas_t initial_replicas) {
replicas_t initial_replicas,
force_reconfiguration is_force_reconfigured) {
vlog(
clusterlog.debug,
"[{}] creating partition, log revision: {}, initial_replicas: {}",
Expand Down Expand Up @@ -1404,22 +1408,37 @@ ss::future<std::error_code> controller_backend::create_partition(
if (auto it = _xst_states.find(ntp); it != _xst_states.end()) {
xst_state = it->second;
}

auto ntp_config = cfg->make_ntp_config(
_data_directory, ntp.tp.partition, log_revision, initial_rev.value());
auto rtp = cfg->properties.remote_topic_properties;
const bool is_cloud_topic = ntp_config.is_archival_enabled()
|| ntp_config.is_remote_fetch_enabled();
const bool is_internal = ntp.ns == model::kafka_internal_namespace;
/**
* Here we decide if a partition needs recovery from tiered storage, it
* may be the case if partition was force reconfigured. In this case we
* simply set the remote topic properties to initialize recovery of data
* from the tiered storage.
*/
if (
is_force_reconfigured && is_cloud_topic && !is_internal
&& !ntp_config.get_overrides().recovery_enabled) {
// topic being cloud enabled implies existence of overrides
ntp_config.get_overrides().recovery_enabled
= storage::topic_recovery_enabled::yes;
rtp.emplace(*initial_rev, cfg->partition_count);
}
// we use offset as an rev as it is always increasing and it
// increases while ntp is being created again
try {
co_await _partition_manager.local().manage(
cfg->make_ntp_config(
_data_directory,
ntp.tp.partition,
log_revision,
initial_rev.value()),
std::move(ntp_config),
group_id,
std::move(initial_brokers),
raft::with_learner_recovery_throttle::yes,
raft::keep_snapshotted_log::no,
std::move(xst_state),
cfg->properties.remote_topic_properties,
rtp,
read_replica_bucket,
cfg->properties.remote_label,
cfg->properties.remote_topic_namespace_override);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class controller_backend

private:
struct ntp_reconciliation_state;
using force_reconfiguration
= ss::bool_class<struct force_reconfiguration_tag>;

// Topics
ss::future<> bootstrap_controller_backend();
Expand Down Expand Up @@ -292,7 +294,8 @@ class controller_backend
model::ntp,
raft::group_id,
model::revision_id log_revision,
replicas_t initial_replicas);
replicas_t initial_replicas,
force_reconfiguration is_force_reconfigured);

ss::future<> add_to_shard_table(
model::ntp,
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ class topic_table {
return _policy;
}

bool is_force_reconfiguration() const {
return _state == reconfiguration_state::force_cancelled
|| _state == reconfiguration_state::force_update;
}

friend std::ostream&
operator<<(std::ostream&, const in_progress_update&);

Expand Down

0 comments on commit eb4c6ce

Please sign in to comment.