From 481b47b62e2b83ef2a24e44291e84be57ce0311c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 2 Jan 2025 14:36:25 +0100 Subject: [PATCH 1/2] c/backend: do not download the partition manifest when creating learner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When learner replica is created its state should not be seeded by the recovery machinery but rather than that it must be recovered by Raft. Using remote recovery when creating the learner replicas may lead to archival_stm issues as some commands are applied more than once. Signed-off-by: Michał Maślanka --- src/v/cluster/controller_backend.cc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 1a6a252e9d7f..431d6aaceb69 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -1364,6 +1364,20 @@ ss::future controller_backend::create_partition( = storage::topic_recovery_enabled::yes; rtp.emplace(remote_rev, cfg.partition_count); } + /** + * Reset remote topic properties if a topic is recovered from tiered + * storage and current node is joining replica set. A node is joining + * replica set if its initial nodes set is empty. + */ + if (initial_nodes.empty() && rtp.has_value()) { + // reset remote topic properties + vlog( + clusterlog.info, + "[{}] Disabling remote recovery while creating partition " + "replica. Current node is added to the replica set as learner.", + ntp); + rtp.reset(); + } // we use offset as an rev as it is always increasing and it // increases while ntp is being created again try { From fff2b724a1b10d8fe88a2b4a2319ec73043e7780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 2 Jan 2025 14:37:26 +0100 Subject: [PATCH 2/2] tests: added scaling up test with recovered topic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added tests triggering partition replica set reconfiguration with topics which are recovered. Signed-off-by: Michał Maślanka --- tests/rptest/tests/scaling_up_test.py | 63 ++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/tests/rptest/tests/scaling_up_test.py b/tests/rptest/tests/scaling_up_test.py index 09cff632201c..e9a985d24e6b 100644 --- a/tests/rptest/tests/scaling_up_test.py +++ b/tests/rptest/tests/scaling_up_test.py @@ -9,6 +9,7 @@ from collections import defaultdict import random, math, time +from rptest.clients.rpk import RpkTool from rptest.services.admin import Admin from rptest.services.cluster import cluster from ducktape.utils.util import wait_until @@ -18,9 +19,10 @@ from rptest.clients.default import DefaultClient from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer, KgoVerifierProducer from rptest.services.redpanda import SISettings -import concurrent +from rptest.utils.node_operations import verify_offset_translator_state_consistent from rptest.tests.prealloc_nodes import PreallocNodesTest +from rptest.util import KafkaCliTools from rptest.utils.mode_checks import skip_debug_mode @@ -627,3 +629,62 @@ def disk_usage_correct(nodes, node_id): assert self.consumer.consumer_status.validator.invalid_reads == 0, \ f"Invalid reads in topic: {topic.name}, invalid reads count: " "{self.consumer.consumer_status.validator.invalid_reads}" + + @skip_debug_mode + @cluster(num_nodes=6) + def test_scaling_up_with_recovered_topic(self): + log_segment_size = 2 * 1024 * 1024 + segments_per_partition = 40 + msg_size = 256 * 1024 + partition_count = 10 + total_records = int( + (segments_per_partition * partition_count * log_segment_size) / + msg_size) + + si_settings = SISettings(test_context=self.test_context, + log_segment_size=log_segment_size, + retention_local_strict=True, + fast_uploads=True) + + self.redpanda.set_si_settings(si_settings) + #start 3 node cluster + self.redpanda.start(nodes=self.redpanda.nodes[0:3]) + # create test topic + cli = KafkaCliTools(self.redpanda) + topic = TopicSpec(name="recovery-topic", + replication_factor=3, + partition_count=partition_count) + rpk = RpkTool(self.redpanda) + rpk.create_topic(topic.name, + partition_count, + 3, + config={ + 'redpanda.remote.write': 'true', + 'redpanda.remote.read': 'true', + 'redpanda.remote.delete': 'false' + }) + # produce some data + cli.produce(topic.name, total_records, msg_size) + self.redpanda.wait_for_manifest_uploads() + + total_replicas = 3 * partition_count + rpk.delete_topic(topic.name) + + rpk.create_topic(topic.name, + partition_count, + 3, + config={ + 'redpanda.remote.recovery': 'true', + 'redpanda.remote.write': 'true', + 'redpanda.remote.read': 'true', + 'redpanda.remote.delete': 'true' + }) + + cli.produce(topic.name, total_records, msg_size) + + self.redpanda.start_node(self.redpanda.nodes[3]) + self.redpanda.start_node(self.redpanda.nodes[4]) + self.wait_for_partitions_rebalanced(total_replicas=total_replicas, + timeout_sec=self.rebalance_timeout) + self.redpanda.wait_for_manifest_uploads() + verify_offset_translator_state_consistent(self.redpanda)