diff --git a/tests/rptest/tests/partition_force_reconfiguration_test.py b/tests/rptest/tests/partition_force_reconfiguration_test.py index b4ed26680dc18..5ec2cc37b3f7c 100644 --- a/tests/rptest/tests/partition_force_reconfiguration_test.py +++ b/tests/rptest/tests/partition_force_reconfiguration_test.py @@ -10,6 +10,7 @@ import requests from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.tests.end_to_end import EndToEndTest from rptest.clients.rpk import RpkTool from ducktape.mark import ignore, matrix @@ -17,10 +18,11 @@ from random import shuffle import time from rptest.tests.partition_movement import PartitionMovementMixin -from rptest.services.admin import Replica +from rptest.services.admin import Admin, Replica from rptest.clients.kcl import KCL from threading import Thread, Condition -from rptest.services.redpanda import RedpandaService +from rptest.services.redpanda import RedpandaService, SISettings +from rptest.tests.redpanda_test import RedpandaTest from rptest.util import wait_until_result @@ -368,25 +370,90 @@ def get_lso(): self.start_consumer() self.run_validation() - @cluster(num_nodes=5) + +class NodeWiseRecoveryTest(RedpandaTest): + def __init__(self, test_context, *args, **kwargs): + super(NodeWiseRecoveryTest, + self).__init__(test_context, + si_settings=SISettings( + log_segment_size=1024 * 1024, + test_context=test_context, + fast_uploads=True, + retention_local_strict=True, + ), + extra_rp_conf={ + "partition_autobalancing_mode": "continuous", + "enable_leader_balancer": False, + }, + num_brokers=5, + *args, + **kwargs) + self.default_timeout_sec = 60 + self.rpk = RpkTool(self.redpanda) + self.admin = Admin(self.redpanda) + + def _alive_nodes(self): + return [n.account.hostname for n in self.redpanda.started_nodes()] + + def collect_topic_partition_states(self, topic): + states = {} + for p in self.rpk.describe_topic(topic): + states[p.id] = self.admin.get_partition_state( + namespace="kafka", + topic=topic, + partition=p.id, + node=self.redpanda.get_node_by_id(p.leader)) + return states + + def get_topic_partition_high_watermarks(self, topic): + return {p.id: p.high_watermark for p in self.rpk.describe_topic(topic)} + + def produce_until_segments_removed(self, topic): + msg_size = 512 + + self.producer = KgoVerifierProducer(self.test_context, self.redpanda, + topic, msg_size, 10000000) + + self.producer.start(clean=False) + + def all_cloud_offsets_advanced(): + states = self.collect_topic_partition_states(topic) + + return all(r['next_cloud_offset'] >= 1000 for s in states.values() + for r in s['replicas']) + + wait_until( + all_cloud_offsets_advanced, + timeout_sec=self.default_timeout_sec, + backoff_sec=1, + err_msg="Error waiting for retention to prefix truncate partitions" + ) + + self.producer.stop() + self.producer.clean() + self.producer.free() + + @cluster(num_nodes=6) @matrix(dead_node_count=[1, 2]) def test_node_wise_recovery(self, dead_node_count): - self.start_redpanda(num_nodes=5, - extra_rp_conf={ - "partition_autobalancing_mode": "continuous", - "enable_leader_balancer": False, - }) + num_topics = 20 # Create a mix of rf=1 and 3 topics. topics = [] for i in range(0, num_topics): rf = 3 if i % 2 == 0 else 1 parts = random.randint(1, 3) + with_ts = random.choice([True, False]) spec = TopicSpec(name=f"topic-{i}", replication_factor=rf, - partition_count=parts) + partition_count=parts, + redpanda_remote_read=with_ts, + redpanda_remote_write=with_ts) topics.append(spec) self.client().create_topic(spec) + self.client().alter_topic_config( + spec.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + 2 * 1024 * 1024) admin = self.redpanda._admin @@ -395,9 +462,17 @@ def test_node_wise_recovery(self, dead_node_count): to_kill_node_ids = [ int(self.redpanda.node_id(n)) for n in to_kill_nodes ] + for t in topics: + self.produce_until_segments_removed(t.name) + self.redpanda.wait_for_manifest_uploads() partitions_lost_majority = admin.get_majority_lost_partitions_from_nodes( dead_brokers=to_kill_node_ids) + # collect topic partition high watermarks before recovery + initial_topic_hws = { + t.name: self.get_topic_partition_high_watermarks(t.name) + for t in topics + } self.logger.debug(f"Stopping nodes: {to_kill_node_ids}") self.redpanda.for_nodes(to_kill_nodes, self.redpanda.stop_node) @@ -408,7 +483,7 @@ def controller_available(): controller) not in to_kill_node_ids wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") @@ -440,7 +515,7 @@ def no_majority_lost_partitions(): try: transfers.pause() wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") lost_majority = admin.get_majority_lost_partitions_from_nodes( @@ -458,7 +533,7 @@ def no_majority_lost_partitions(): # Wait until there are no partition assignments with majority loss due to dead nodes. wait_until(no_majority_lost_partitions, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Node wise recovery failed") @@ -474,13 +549,13 @@ def no_pending_force_reconfigurations(): try: transfers.pause() wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") # Wait for balancer tick to run so the data is populated. wait_until(lambda: get_partition_balancer_status( lambda s: s["status"] != "starting"), - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Balancer tick did not run in time") return get_partition_balancer_status(lambda s: s[ @@ -492,7 +567,7 @@ def no_pending_force_reconfigurations(): transfers.resume() wait_until(no_pending_force_reconfigurations, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="reported force recovery count is non zero") @@ -502,6 +577,19 @@ def no_pending_force_reconfigurations(): self.redpanda._admin.await_stable_leader( topic=topic.name, partition=part, - timeout_s=self.WAIT_TIMEOUT_S, + timeout_s=self.default_timeout_sec, backoff_s=2, hosts=self._alive_nodes()) + topic_hws_after_recovery = { + t.name: self.get_topic_partition_high_watermarks(t.name) + for t in topics + } + + for t in topics: + for partition_id, initial_hw in initial_topic_hws[t.name].items(): + final_hw = topic_hws_after_recovery[t.name][partition_id] + self.logger.info( + f"partition {t}/{partition_id} replicas initial high watermark: {initial_hw} final high watermark: {final_hw}" + ) + if t.redpanda_remote_write or t.replication_factor == 3: + assert final_hw >= 0.8 * initial_hw and final_hw <= initial_hw