Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use remote recovery when topic is configured to use remote data #22908

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "metrics/prometheus_sanitize.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "raft/fundamental.h"
#include "raft/group_configuration.h"
#include "ssx/event.h"
Expand Down Expand Up @@ -991,7 +992,7 @@ ss::future<result<ss::stop_iteration>> controller_backend::reconcile_ntp_step(

vlog(
clusterlog.debug,
"[{}] reconcilation step, reconciliation state: {}",
"[{}] reconciliation step, reconciliation state: {}",
ntp,
rs);

Expand Down Expand Up @@ -1159,12 +1160,14 @@ ss::future<result<ss::stop_iteration>> controller_backend::reconcile_ntp_step(
// Configuration will be replicate to the new replica
initial_replicas = {};
}

auto ec = co_await create_partition(
ntp,
group_id,
expected_log_revision.value(),
std::move(initial_replicas));
std::move(initial_replicas),
force_reconfiguration{
replicas_view.update
&& replicas_view.update->is_force_reconfiguration()});
if (ec) {
co_return ec;
}
Expand Down Expand Up @@ -1359,7 +1362,8 @@ ss::future<std::error_code> controller_backend::create_partition(
model::ntp ntp,
raft::group_id group_id,
model::revision_id log_revision,
replicas_t initial_replicas) {
replicas_t initial_replicas,
force_reconfiguration is_force_reconfigured) {
vlog(
clusterlog.debug,
"[{}] creating partition, log revision: {}, initial_replicas: {}",
Expand Down Expand Up @@ -1404,22 +1408,37 @@ ss::future<std::error_code> controller_backend::create_partition(
if (auto it = _xst_states.find(ntp); it != _xst_states.end()) {
xst_state = it->second;
}

auto ntp_config = cfg->make_ntp_config(
_data_directory, ntp.tp.partition, log_revision, initial_rev.value());
auto rtp = cfg->properties.remote_topic_properties;
const bool is_cloud_topic = ntp_config.is_archival_enabled()
|| ntp_config.is_remote_fetch_enabled();
const bool is_internal = ntp.ns == model::kafka_internal_namespace;
/**
* Here we decide if a partition needs recovery from tiered storage, it
* may be the case if partition was force reconfigured. In this case we
* simply set the remote topic properties to initialize recovery of data
* from the tiered storage.
*/
if (
is_force_reconfigured && is_cloud_topic && !is_internal
&& !ntp_config.get_overrides().recovery_enabled) {
// topic being cloud enabled implies existence of overrides
ntp_config.get_overrides().recovery_enabled
= storage::topic_recovery_enabled::yes;
rtp.emplace(*initial_rev, cfg->partition_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with initial_version perhaps @ztlpn can take another look.

}
// we use offset as an rev as it is always increasing and it
// increases while ntp is being created again
try {
co_await _partition_manager.local().manage(
cfg->make_ntp_config(
_data_directory,
ntp.tp.partition,
log_revision,
initial_rev.value()),
std::move(ntp_config),
group_id,
std::move(initial_brokers),
raft::with_learner_recovery_throttle::yes,
raft::keep_snapshotted_log::no,
std::move(xst_state),
cfg->properties.remote_topic_properties,
rtp,
read_replica_bucket,
cfg->properties.remote_label,
cfg->properties.remote_topic_namespace_override);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class controller_backend

private:
struct ntp_reconciliation_state;
using force_reconfiguration
= ss::bool_class<struct force_reconfiguration_tag>;

// Topics
ss::future<> bootstrap_controller_backend();
Expand Down Expand Up @@ -292,7 +294,8 @@ class controller_backend
model::ntp,
raft::group_id,
model::revision_id log_revision,
replicas_t initial_replicas);
replicas_t initial_replicas,
force_reconfiguration is_force_reconfigured);

ss::future<> add_to_shard_table(
model::ntp,
Expand Down
12 changes: 12 additions & 0 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ ss::future<consensus_ptr> partition_manager::manage(
std::optional<cloud_storage_clients::bucket_name> read_replica_bucket,
std::optional<cloud_storage::remote_label> remote_label,
std::optional<model::topic_namespace> topic_namespace_override) {
vlog(
clusterlog.trace,
"Creating partition with configuration: {}, raft group_id: {}, "
"initial_nodes: {}, remote topic properties: {}, remote label: {}, "
"topic_namespace_override: {}",
ntp_cfg,
group,
initial_nodes,
rtp,
remote_label,
topic_namespace_override);

auto guard = _gate.hold();
// topic_namespace_override is used in case of a cluster migration.
// The original ("source") topic name must be used in the tiered
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ class topic_table {
return _policy;
}

bool is_force_reconfiguration() const {
return _state == reconfiguration_state::force_cancelled
|| _state == reconfiguration_state::force_update;
}

friend std::ostream&
operator<<(std::ostream&, const in_progress_update&);

Expand Down
72 changes: 35 additions & 37 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3403,20 +3403,17 @@ def set_cluster_config_to_null(self,
name: str,
bashtanov marked this conversation as resolved.
Show resolved Hide resolved
expect_restart: bool = False,
admin_client: Optional[Admin] = None,
timeout: int = 10):
if admin_client is None:
admin_client = self._admin

patch_result = admin_client.patch_cluster_config(upsert={name: None})
new_version = patch_result['config_version']

self._wait_for_config_version(new_version, expect_restart, timeout)
timeout: int = 10,
tolerate_stopped_nodes=False):
def set_cluster_config_to_null(self, *args, **kwargs):
self.set_cluster_config(*args, values={name: None}, *kwargs)

def set_cluster_config(self,
values: dict,
expect_restart: bool = False,
admin_client: Optional[Admin] = None,
timeout: int = 10):
timeout: int = 10,
tolerate_stopped_nodes=False):
"""
Update cluster configuration and wait for all nodes to report that they
have seen the new config.
Expand All @@ -3432,23 +3429,30 @@ def set_cluster_config(self,
remove=[])
new_version = patch_result['config_version']

self._wait_for_config_version(new_version,
expect_restart,
timeout,
admin_client=admin_client)
self._wait_for_config_version(
new_version,
expect_restart,
timeout,
admin_client=admin_client,
tolerate_stopped_nodes=tolerate_stopped_nodes)

def _wait_for_config_version(self,
config_version,
expect_restart: bool,
timeout: int,
admin_client: Optional[Admin] = None):
admin_client: Optional[Admin] = None,
tolerate_stopped_nodes=False):
admin_client = admin_client or self._admin
if tolerate_stopped_nodes:
started_node_ids = {self.node_id(n) for n in self.started_nodes()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I find having a variable declared/undeclared depending on the condition somewhat error-prone and harder to read too. What do you think of something like the following?

if tolerate_stopped_nodes:
    started_node_ids = {self.node_id(n) for n in self.started_nodes()}
    node_check_predicate = lambda n: n in started_node_ids
else:
    node_check_predicate = lambda n: True
...
ready = all([n['config_version'] >= config_version for n in status if node_check_predicate(n)])


def is_ready():
status = admin_client.get_cluster_config_status(
node=self.controller())
ready = all(
[n['config_version'] >= config_version for n in status])
ready = all([
n['config_version'] >= config_version for n in status if
not tolerate_stopped_nodes or n['node_id'] in started_node_ids
])

return ready, status

Expand Down Expand Up @@ -5020,27 +5024,21 @@ def wait_for_internal_scrub(self, cloud_storage_partitions):
if not cloud_storage_partitions:
return None

self.set_cluster_config({
"cloud_storage_enable_scrubbing":
True,
"cloud_storage_partial_scrub_interval_ms":
100,
"cloud_storage_full_scrub_interval_ms":
1000 * 60 * 10,
"cloud_storage_scrubbing_interval_jitter_ms":
100,
"cloud_storage_background_jobs_quota":
5000,
"cloud_storage_housekeeping_interval_ms":
100,
# Segment merging may resolve gaps in the log, so disable it
"cloud_storage_enable_segment_merging":
False,
# Leadership moves may perturb the scrub, so disable it to
# streamline the actions below.
"enable_leader_balancer":
False
})
self.set_cluster_config(
{
"cloud_storage_enable_scrubbing": True,
"cloud_storage_partial_scrub_interval_ms": 100,
"cloud_storage_full_scrub_interval_ms": 1000 * 60 * 10,
"cloud_storage_scrubbing_interval_jitter_ms": 100,
"cloud_storage_background_jobs_quota": 5000,
"cloud_storage_housekeeping_interval_ms": 100,
# Segment merging may resolve gaps in the log, so disable it
"cloud_storage_enable_segment_merging": False,
# Leadership moves may perturb the scrub, so disable it to
# streamline the actions below.
"enable_leader_balancer": False
},
tolerate_stopped_nodes=True)

unavailable = set()
for p in cloud_storage_partitions:
Expand Down
Loading