diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 360dc5d6652fa..b2996522d6698 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -32,6 +32,7 @@ #include "metrics/prometheus_sanitize.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "model/namespace.h" #include "raft/fundamental.h" #include "raft/group_configuration.h" #include "ssx/event.h" @@ -991,7 +992,7 @@ ss::future> controller_backend::reconcile_ntp_step( vlog( clusterlog.debug, - "[{}] reconcilation step, reconciliation state: {}", + "[{}] reconciliation step, reconciliation state: {}", ntp, rs); @@ -1159,12 +1160,14 @@ ss::future> controller_backend::reconcile_ntp_step( // Configuration will be replicate to the new replica initial_replicas = {}; } - auto ec = co_await create_partition( ntp, group_id, expected_log_revision.value(), - std::move(initial_replicas)); + std::move(initial_replicas), + force_reconfiguration{ + replicas_view.update + && replicas_view.update->is_force_reconfiguration()}); if (ec) { co_return ec; } @@ -1359,7 +1362,8 @@ ss::future controller_backend::create_partition( model::ntp ntp, raft::group_id group_id, model::revision_id log_revision, - replicas_t initial_replicas) { + replicas_t initial_replicas, + force_reconfiguration is_force_reconfigured) { vlog( clusterlog.debug, "[{}] creating partition, log revision: {}, initial_replicas: {}", @@ -1404,22 +1408,37 @@ ss::future controller_backend::create_partition( if (auto it = _xst_states.find(ntp); it != _xst_states.end()) { xst_state = it->second; } - + auto ntp_config = cfg->make_ntp_config( + _data_directory, ntp.tp.partition, log_revision, initial_rev.value()); + auto rtp = cfg->properties.remote_topic_properties; + const bool is_cloud_topic = ntp_config.is_archival_enabled() + || ntp_config.is_remote_fetch_enabled(); + const bool is_internal = ntp.ns == model::kafka_internal_namespace; + /** + * Here we decide if a partition needs recovery from tiered storage, it + * may be the case if partition was force reconfigured. In this case we + * simply set the remote topic properties to initialize recovery of data + * from the tiered storage. + */ + if ( + is_force_reconfigured && is_cloud_topic && !is_internal + && !ntp_config.get_overrides().recovery_enabled) { + // topic being cloud enabled implies existence of overrides + ntp_config.get_overrides().recovery_enabled + = storage::topic_recovery_enabled::yes; + rtp.emplace(*initial_rev, cfg->partition_count); + } // we use offset as an rev as it is always increasing and it // increases while ntp is being created again try { co_await _partition_manager.local().manage( - cfg->make_ntp_config( - _data_directory, - ntp.tp.partition, - log_revision, - initial_rev.value()), + std::move(ntp_config), group_id, std::move(initial_brokers), raft::with_learner_recovery_throttle::yes, raft::keep_snapshotted_log::no, std::move(xst_state), - cfg->properties.remote_topic_properties, + rtp, read_replica_bucket, cfg->properties.remote_label, cfg->properties.remote_topic_namespace_override); diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index f824cb0ed1b3c..a70cffb8fc868 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -247,6 +247,8 @@ class controller_backend private: struct ntp_reconciliation_state; + using force_reconfiguration + = ss::bool_class; // Topics ss::future<> bootstrap_controller_backend(); @@ -292,7 +294,8 @@ class controller_backend model::ntp, raft::group_id, model::revision_id log_revision, - replicas_t initial_replicas); + replicas_t initial_replicas, + force_reconfiguration is_force_reconfigured); ss::future<> add_to_shard_table( model::ntp, diff --git a/src/v/cluster/partition_manager.cc b/src/v/cluster/partition_manager.cc index b1ebf808791dd..98757f6551574 100644 --- a/src/v/cluster/partition_manager.cc +++ b/src/v/cluster/partition_manager.cc @@ -123,6 +123,18 @@ ss::future partition_manager::manage( std::optional read_replica_bucket, std::optional remote_label, std::optional topic_namespace_override) { + vlog( + clusterlog.trace, + "Creating partition with configuration: {}, raft group_id: {}, " + "initial_nodes: {}, remote topic properties: {}, remote label: {}, " + "topic_namespace_override: {}", + ntp_cfg, + group, + initial_nodes, + rtp, + remote_label, + topic_namespace_override); + auto guard = _gate.hold(); // topic_namespace_override is used in case of a cluster migration. // The original ("source") topic name must be used in the tiered diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index b10d136834bd8..9366e956b5fe2 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -185,6 +185,11 @@ class topic_table { return _policy; } + bool is_force_reconfiguration() const { + return _state == reconfiguration_state::force_cancelled + || _state == reconfiguration_state::force_update; + } + friend std::ostream& operator<<(std::ostream&, const in_progress_update&); diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 6bdee1800c4f5..71c3225f9fd5a 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -3403,20 +3403,17 @@ def set_cluster_config_to_null(self, name: str, expect_restart: bool = False, admin_client: Optional[Admin] = None, - timeout: int = 10): - if admin_client is None: - admin_client = self._admin - - patch_result = admin_client.patch_cluster_config(upsert={name: None}) - new_version = patch_result['config_version'] - - self._wait_for_config_version(new_version, expect_restart, timeout) + timeout: int = 10, + tolerate_stopped_nodes=False): + def set_cluster_config_to_null(self, *args, **kwargs): + self.set_cluster_config(*args, values={name: None}, *kwargs) def set_cluster_config(self, values: dict, expect_restart: bool = False, admin_client: Optional[Admin] = None, - timeout: int = 10): + timeout: int = 10, + tolerate_stopped_nodes=False): """ Update cluster configuration and wait for all nodes to report that they have seen the new config. @@ -3432,23 +3429,30 @@ def set_cluster_config(self, remove=[]) new_version = patch_result['config_version'] - self._wait_for_config_version(new_version, - expect_restart, - timeout, - admin_client=admin_client) + self._wait_for_config_version( + new_version, + expect_restart, + timeout, + admin_client=admin_client, + tolerate_stopped_nodes=tolerate_stopped_nodes) def _wait_for_config_version(self, config_version, expect_restart: bool, timeout: int, - admin_client: Optional[Admin] = None): + admin_client: Optional[Admin] = None, + tolerate_stopped_nodes=False): admin_client = admin_client or self._admin + if tolerate_stopped_nodes: + started_node_ids = {self.node_id(n) for n in self.started_nodes()} def is_ready(): status = admin_client.get_cluster_config_status( node=self.controller()) - ready = all( - [n['config_version'] >= config_version for n in status]) + ready = all([ + n['config_version'] >= config_version for n in status if + not tolerate_stopped_nodes or n['node_id'] in started_node_ids + ]) return ready, status @@ -5020,27 +5024,21 @@ def wait_for_internal_scrub(self, cloud_storage_partitions): if not cloud_storage_partitions: return None - self.set_cluster_config({ - "cloud_storage_enable_scrubbing": - True, - "cloud_storage_partial_scrub_interval_ms": - 100, - "cloud_storage_full_scrub_interval_ms": - 1000 * 60 * 10, - "cloud_storage_scrubbing_interval_jitter_ms": - 100, - "cloud_storage_background_jobs_quota": - 5000, - "cloud_storage_housekeeping_interval_ms": - 100, - # Segment merging may resolve gaps in the log, so disable it - "cloud_storage_enable_segment_merging": - False, - # Leadership moves may perturb the scrub, so disable it to - # streamline the actions below. - "enable_leader_balancer": - False - }) + self.set_cluster_config( + { + "cloud_storage_enable_scrubbing": True, + "cloud_storage_partial_scrub_interval_ms": 100, + "cloud_storage_full_scrub_interval_ms": 1000 * 60 * 10, + "cloud_storage_scrubbing_interval_jitter_ms": 100, + "cloud_storage_background_jobs_quota": 5000, + "cloud_storage_housekeeping_interval_ms": 100, + # Segment merging may resolve gaps in the log, so disable it + "cloud_storage_enable_segment_merging": False, + # Leadership moves may perturb the scrub, so disable it to + # streamline the actions below. + "enable_leader_balancer": False + }, + tolerate_stopped_nodes=True) unavailable = set() for p in cloud_storage_partitions: diff --git a/tests/rptest/tests/partition_force_reconfiguration_test.py b/tests/rptest/tests/partition_force_reconfiguration_test.py index b4ed26680dc18..58734242d8662 100644 --- a/tests/rptest/tests/partition_force_reconfiguration_test.py +++ b/tests/rptest/tests/partition_force_reconfiguration_test.py @@ -10,6 +10,7 @@ import requests from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.tests.end_to_end import EndToEndTest from rptest.clients.rpk import RpkTool from ducktape.mark import ignore, matrix @@ -17,10 +18,11 @@ from random import shuffle import time from rptest.tests.partition_movement import PartitionMovementMixin -from rptest.services.admin import Replica +from rptest.services.admin import Admin, Replica from rptest.clients.kcl import KCL from threading import Thread, Condition -from rptest.services.redpanda import RedpandaService +from rptest.services.redpanda import RedpandaService, SISettings +from rptest.tests.redpanda_test import RedpandaTest from rptest.util import wait_until_result @@ -368,25 +370,90 @@ def get_lso(): self.start_consumer() self.run_validation() - @cluster(num_nodes=5) + +class NodeWiseRecoveryTest(RedpandaTest): + def __init__(self, test_context, *args, **kwargs): + super(NodeWiseRecoveryTest, + self).__init__(test_context, + si_settings=SISettings( + log_segment_size=1024 * 1024, + test_context=test_context, + fast_uploads=True, + retention_local_strict=True, + ), + extra_rp_conf={ + "partition_autobalancing_mode": "continuous", + "enable_leader_balancer": False, + }, + num_brokers=5, + *args, + **kwargs) + self.default_timeout_sec = 60 + self.rpk = RpkTool(self.redpanda) + self.admin = Admin(self.redpanda) + + def _alive_nodes(self): + return [n.account.hostname for n in self.redpanda.started_nodes()] + + def collect_topic_partition_states(self, topic): + states = {} + for p in self.rpk.describe_topic(topic): + states[p.id] = self.admin.get_partition_state( + namespace="kafka", + topic=topic, + partition=p.id, + node=self.redpanda.get_node_by_id(p.leader)) + return states + + def get_topic_partition_high_watermarks(self, topic): + return {p.id: p.high_watermark for p in self.rpk.describe_topic(topic)} + + def produce_until_segments_removed(self, topic): + msg_size = 512 + + self.producer = KgoVerifierProducer(self.test_context, self.redpanda, + topic, msg_size, 10000000) + + self.producer.start(clean=False) + + def all_cloud_offsets_advanced(): + states = self.collect_topic_partition_states(topic) + + return all(r['next_cloud_offset'] >= 1000 for s in states.values() + for r in s['replicas']) + + wait_until( + all_cloud_offsets_advanced, + timeout_sec=self.default_timeout_sec, + backoff_sec=1, + err_msg="Error waiting for retention to prefix truncate partitions" + ) + + self.producer.stop() + self.producer.clean() + self.producer.free() + + @cluster(num_nodes=6) @matrix(dead_node_count=[1, 2]) def test_node_wise_recovery(self, dead_node_count): - self.start_redpanda(num_nodes=5, - extra_rp_conf={ - "partition_autobalancing_mode": "continuous", - "enable_leader_balancer": False, - }) + num_topics = 20 # Create a mix of rf=1 and 3 topics. topics = [] for i in range(0, num_topics): rf = 3 if i % 2 == 0 else 1 parts = random.randint(1, 3) + with_ts = random.choice([True, False]) spec = TopicSpec(name=f"topic-{i}", replication_factor=rf, - partition_count=parts) + partition_count=parts, + redpanda_remote_read=with_ts, + redpanda_remote_write=with_ts) topics.append(spec) self.client().create_topic(spec) + self.client().alter_topic_config( + spec.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + 2 * 1024 * 1024) admin = self.redpanda._admin @@ -395,9 +462,17 @@ def test_node_wise_recovery(self, dead_node_count): to_kill_node_ids = [ int(self.redpanda.node_id(n)) for n in to_kill_nodes ] + for t in topics: + self.produce_until_segments_removed(t.name) + self.redpanda.wait_for_manifest_uploads() partitions_lost_majority = admin.get_majority_lost_partitions_from_nodes( dead_brokers=to_kill_node_ids) + # collect topic partition high watermarks before recovery + initial_topic_hws = { + t.name: self.get_topic_partition_high_watermarks(t.name) + for t in topics + } self.logger.debug(f"Stopping nodes: {to_kill_node_ids}") self.redpanda.for_nodes(to_kill_nodes, self.redpanda.stop_node) @@ -408,7 +483,7 @@ def controller_available(): controller) not in to_kill_node_ids wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") @@ -440,7 +515,7 @@ def no_majority_lost_partitions(): try: transfers.pause() wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") lost_majority = admin.get_majority_lost_partitions_from_nodes( @@ -458,7 +533,7 @@ def no_majority_lost_partitions(): # Wait until there are no partition assignments with majority loss due to dead nodes. wait_until(no_majority_lost_partitions, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Node wise recovery failed") @@ -474,13 +549,13 @@ def no_pending_force_reconfigurations(): try: transfers.pause() wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") # Wait for balancer tick to run so the data is populated. wait_until(lambda: get_partition_balancer_status( lambda s: s["status"] != "starting"), - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Balancer tick did not run in time") return get_partition_balancer_status(lambda s: s[ @@ -492,7 +567,7 @@ def no_pending_force_reconfigurations(): transfers.resume() wait_until(no_pending_force_reconfigurations, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="reported force recovery count is non zero") @@ -502,6 +577,19 @@ def no_pending_force_reconfigurations(): self.redpanda._admin.await_stable_leader( topic=topic.name, partition=part, - timeout_s=self.WAIT_TIMEOUT_S, + timeout_s=self.default_timeout_sec, backoff_s=2, hosts=self._alive_nodes()) + topic_hws_after_recovery = { + t.name: self.get_topic_partition_high_watermarks(t.name) + for t in topics + } + + for t in topics: + for partition_id, initial_hw in initial_topic_hws[t.name].items(): + final_hw = topic_hws_after_recovery[t.name][partition_id] + self.logger.info( + f"partition {t}/{partition_id} replicas initial high watermark: {initial_hw} final high watermark: {final_hw}" + ) + if t.redpanda_remote_write or t.replication_factor == 3: + assert 0.8 * initial_hw <= final_hw <= initial_hw