diff --git a/src/v/cluster/controller_backend.cc b/src/v/cluster/controller_backend.cc index 1a6a252e9d7f..431d6aaceb69 100644 --- a/src/v/cluster/controller_backend.cc +++ b/src/v/cluster/controller_backend.cc @@ -1364,6 +1364,20 @@ ss::future controller_backend::create_partition( = storage::topic_recovery_enabled::yes; rtp.emplace(remote_rev, cfg.partition_count); } + /** + * Reset remote topic properties if a topic is recovered from tiered + * storage and current node is joining replica set. A node is joining + * replica set if its initial nodes set is empty. + */ + if (initial_nodes.empty() && rtp.has_value()) { + // reset remote topic properties + vlog( + clusterlog.info, + "[{}] Disabling remote recovery while creating partition " + "replica. Current node is added to the replica set as learner.", + ntp); + rtp.reset(); + } // we use offset as an rev as it is always increasing and it // increases while ntp is being created again try { diff --git a/tests/rptest/tests/scaling_up_test.py b/tests/rptest/tests/scaling_up_test.py index 09cff632201c..e9a985d24e6b 100644 --- a/tests/rptest/tests/scaling_up_test.py +++ b/tests/rptest/tests/scaling_up_test.py @@ -9,6 +9,7 @@ from collections import defaultdict import random, math, time +from rptest.clients.rpk import RpkTool from rptest.services.admin import Admin from rptest.services.cluster import cluster from ducktape.utils.util import wait_until @@ -18,9 +19,10 @@ from rptest.clients.default import DefaultClient from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer, KgoVerifierProducer from rptest.services.redpanda import SISettings -import concurrent +from rptest.utils.node_operations import verify_offset_translator_state_consistent from rptest.tests.prealloc_nodes import PreallocNodesTest +from rptest.util import KafkaCliTools from rptest.utils.mode_checks import skip_debug_mode @@ -627,3 +629,62 @@ def disk_usage_correct(nodes, node_id): assert self.consumer.consumer_status.validator.invalid_reads == 0, \ f"Invalid reads in topic: {topic.name}, invalid reads count: " "{self.consumer.consumer_status.validator.invalid_reads}" + + @skip_debug_mode + @cluster(num_nodes=6) + def test_scaling_up_with_recovered_topic(self): + log_segment_size = 2 * 1024 * 1024 + segments_per_partition = 40 + msg_size = 256 * 1024 + partition_count = 10 + total_records = int( + (segments_per_partition * partition_count * log_segment_size) / + msg_size) + + si_settings = SISettings(test_context=self.test_context, + log_segment_size=log_segment_size, + retention_local_strict=True, + fast_uploads=True) + + self.redpanda.set_si_settings(si_settings) + #start 3 node cluster + self.redpanda.start(nodes=self.redpanda.nodes[0:3]) + # create test topic + cli = KafkaCliTools(self.redpanda) + topic = TopicSpec(name="recovery-topic", + replication_factor=3, + partition_count=partition_count) + rpk = RpkTool(self.redpanda) + rpk.create_topic(topic.name, + partition_count, + 3, + config={ + 'redpanda.remote.write': 'true', + 'redpanda.remote.read': 'true', + 'redpanda.remote.delete': 'false' + }) + # produce some data + cli.produce(topic.name, total_records, msg_size) + self.redpanda.wait_for_manifest_uploads() + + total_replicas = 3 * partition_count + rpk.delete_topic(topic.name) + + rpk.create_topic(topic.name, + partition_count, + 3, + config={ + 'redpanda.remote.recovery': 'true', + 'redpanda.remote.write': 'true', + 'redpanda.remote.read': 'true', + 'redpanda.remote.delete': 'true' + }) + + cli.produce(topic.name, total_records, msg_size) + + self.redpanda.start_node(self.redpanda.nodes[3]) + self.redpanda.start_node(self.redpanda.nodes[4]) + self.wait_for_partitions_rebalanced(total_replicas=total_replicas, + timeout_sec=self.rebalance_timeout) + self.redpanda.wait_for_manifest_uploads() + verify_offset_translator_state_consistent(self.redpanda)