From eb4c6ce84c0e972ee4deb50ea95f80c3b4fd6943 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 16 Aug 2024 12:33:52 +0000 Subject: [PATCH] c/backend: use recovery when topic configure to use remote data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When topic is configured to use remote data the data can be used when force recovering partitions that lost majority. In this case instead of creating an empty partition replica instance we customize arguments passed into the `partition_manger::manage` method to enable remote recovery of replica data. Signed-off-by: Michał Maślanka --- src/v/cluster/controller_backend.cc | 39 +++++++++++++++++++++-------- src/v/cluster/controller_backend.h | 5 +++- src/v/cluster/topic_table.h | 5 ++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 5044f136fe9a..b2996522d669 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -32,6 +32,7 @@ #include "metrics/prometheus_sanitize.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "model/namespace.h" #include "raft/fundamental.h" #include "raft/group_configuration.h" #include "ssx/event.h" @@ -1159,12 +1160,14 @@ ss::future> controller_backend::reconcile_ntp_step( // Configuration will be replicate to the new replica initial_replicas = {}; } - auto ec = co_await create_partition( ntp, group_id, expected_log_revision.value(), - std::move(initial_replicas)); + std::move(initial_replicas), + force_reconfiguration{ + replicas_view.update + && replicas_view.update->is_force_reconfiguration()}); if (ec) { co_return ec; } @@ -1359,7 +1362,8 @@ ss::future controller_backend::create_partition( model::ntp ntp, raft::group_id group_id, model::revision_id log_revision, - replicas_t initial_replicas) { + replicas_t initial_replicas, + force_reconfiguration is_force_reconfigured) { vlog( clusterlog.debug, "[{}] creating partition, log revision: {}, initial_replicas: {}", @@ -1404,22 +1408,37 @@ ss::future controller_backend::create_partition( if (auto it = _xst_states.find(ntp); it != _xst_states.end()) { xst_state = it->second; } - + auto ntp_config = cfg->make_ntp_config( + _data_directory, ntp.tp.partition, log_revision, initial_rev.value()); + auto rtp = cfg->properties.remote_topic_properties; + const bool is_cloud_topic = ntp_config.is_archival_enabled() + || ntp_config.is_remote_fetch_enabled(); + const bool is_internal = ntp.ns == model::kafka_internal_namespace; + /** + * Here we decide if a partition needs recovery from tiered storage, it + * may be the case if partition was force reconfigured. In this case we + * simply set the remote topic properties to initialize recovery of data + * from the tiered storage. + */ + if ( + is_force_reconfigured && is_cloud_topic && !is_internal + && !ntp_config.get_overrides().recovery_enabled) { + // topic being cloud enabled implies existence of overrides + ntp_config.get_overrides().recovery_enabled + = storage::topic_recovery_enabled::yes; + rtp.emplace(*initial_rev, cfg->partition_count); + } // we use offset as an rev as it is always increasing and it // increases while ntp is being created again try { co_await _partition_manager.local().manage( - cfg->make_ntp_config( - _data_directory, - ntp.tp.partition, - log_revision, - initial_rev.value()), + std::move(ntp_config), group_id, std::move(initial_brokers), raft::with_learner_recovery_throttle::yes, raft::keep_snapshotted_log::no, std::move(xst_state), - cfg->properties.remote_topic_properties, + rtp, read_replica_bucket, cfg->properties.remote_label, cfg->properties.remote_topic_namespace_override); diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index f824cb0ed1b3..a70cffb8fc86 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -247,6 +247,8 @@ class controller_backend private: struct ntp_reconciliation_state; + using force_reconfiguration + = ss::bool_class; // Topics ss::future<> bootstrap_controller_backend(); @@ -292,7 +294,8 @@ class controller_backend model::ntp, raft::group_id, model::revision_id log_revision, - replicas_t initial_replicas); + replicas_t initial_replicas, + force_reconfiguration is_force_reconfigured); ss::future<> add_to_shard_table( model::ntp, diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index b10d136834bd..9366e956b5fe 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -185,6 +185,11 @@ class topic_table { return _policy; } + bool is_force_reconfiguration() const { + return _state == reconfiguration_state::force_cancelled + || _state == reconfiguration_state::force_update; + } + friend std::ostream& operator<<(std::ostream&, const in_progress_update&);