diff --git a/tests/rptest/tests/partition_force_reconfiguration_test.py b/tests/rptest/tests/partition_force_reconfiguration_test.py index b4ed26680dc18..651b59e5b5a22 100644 --- a/tests/rptest/tests/partition_force_reconfiguration_test.py +++ b/tests/rptest/tests/partition_force_reconfiguration_test.py @@ -10,6 +10,7 @@ import requests from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec +from rptest.services.kgo_verifier_services import KgoVerifierProducer from rptest.tests.end_to_end import EndToEndTest from rptest.clients.rpk import RpkTool from ducktape.mark import ignore, matrix @@ -17,10 +18,11 @@ from random import shuffle import time from rptest.tests.partition_movement import PartitionMovementMixin -from rptest.services.admin import Replica +from rptest.services.admin import Admin, Replica from rptest.clients.kcl import KCL from threading import Thread, Condition -from rptest.services.redpanda import RedpandaService +from rptest.services.redpanda import RedpandaService, SISettings +from rptest.tests.redpanda_test import RedpandaTest from rptest.util import wait_until_result @@ -368,25 +370,95 @@ def get_lso(): self.start_consumer() self.run_validation() - @cluster(num_nodes=5) + +class NodeWiseRecoveryTest(RedpandaTest): + def __init__(self, test_context, *args, **kwargs): + super(NodeWiseRecoveryTest, + self).__init__(test_context, + si_settings=SISettings( + log_segment_size=1024 * 1024, + test_context=test_context, + fast_uploads=True, + retention_local_strict=True, + ), + extra_rp_conf={ + "partition_autobalancing_mode": "continuous", + "enable_leader_balancer": False, + }, + num_brokers=5, + *args, + **kwargs) + self.default_timeout_sec = 60 + self.rpk = RpkTool(self.redpanda) + self.admin = Admin(self.redpanda) + + def _alive_nodes(self): + return [n.account.hostname for n in self.redpanda.started_nodes()] + + def collect_topic_partition_states(self, topic): + states = [] + for p in self.rpk.describe_topic(topic): + states.append( + self.admin.get_partition_state( + namespace="kafka", + topic=topic, + partition=p.id, + node=self.redpanda.get_node_by_id(p.leader))) + return states + + def get_topic_partition_high_watermarks(self, topic): + statuses = self.collect_topic_partition_states(topic) + return [[r['high_watermark'] for r in s['replicas']] for s in statuses] + + def producer_until_segments_removed(self, topic): + msg_size = 512 + + self.producer = KgoVerifierProducer(self.test_context, self.redpanda, + topic, msg_size, 10000000) + + self.producer.start(clean=False) + + def all_cloud_offsets_advanced(): + states = self.collect_topic_partition_states(topic) + for s in states: + for r in s['replicas']: + if r['next_cloud_offset'] < 1000: + return False + + return True + + wait_until( + all_cloud_offsets_advanced, + timeout_sec=self.default_timeout_sec, + backoff_sec=1, + err_msg="Error waiting for retention to prefix truncate partitions" + ) + + self.producer.stop() + self.producer.clean() + self.producer.free() + + @cluster(num_nodes=6) @matrix(dead_node_count=[1, 2]) def test_node_wise_recovery(self, dead_node_count): - self.start_redpanda(num_nodes=5, - extra_rp_conf={ - "partition_autobalancing_mode": "continuous", - "enable_leader_balancer": False, - }) + num_topics = 20 # Create a mix of rf=1 and 3 topics. topics = [] for i in range(0, num_topics): rf = 3 if i % 2 == 0 else 1 parts = random.randint(1, 3) + with_ts = random.randint(0, 100) > 50 spec = TopicSpec(name=f"topic-{i}", replication_factor=rf, - partition_count=parts) + partition_count=parts, + redpanda_remote_read=with_ts, + redpanda_remote_write=with_ts) topics.append(spec) self.client().create_topic(spec) + self.client().alter_topic_config( + spec.name, TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_BYTES, + 2 * 1024 * 1024) admin = self.redpanda._admin @@ -395,9 +467,17 @@ def test_node_wise_recovery(self, dead_node_count): to_kill_node_ids = [ int(self.redpanda.node_id(n)) for n in to_kill_nodes ] + for t in topics: + self.producer_until_segments_removed(t.name) + self.redpanda.wait_for_manifest_uploads() partitions_lost_majority = admin.get_majority_lost_partitions_from_nodes( dead_brokers=to_kill_node_ids) + # collect topic partition high watermarks before recovery + initial_topic_hws = {} + for t in topics: + initial_topic_hws[ + t.name] = self.get_topic_partition_high_watermarks(t.name) self.logger.debug(f"Stopping nodes: {to_kill_node_ids}") self.redpanda.for_nodes(to_kill_nodes, self.redpanda.stop_node) @@ -408,7 +488,7 @@ def controller_available(): controller) not in to_kill_node_ids wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") @@ -440,7 +520,7 @@ def no_majority_lost_partitions(): try: transfers.pause() wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") lost_majority = admin.get_majority_lost_partitions_from_nodes( @@ -458,7 +538,7 @@ def no_majority_lost_partitions(): # Wait until there are no partition assignments with majority loss due to dead nodes. wait_until(no_majority_lost_partitions, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Node wise recovery failed") @@ -474,13 +554,13 @@ def no_pending_force_reconfigurations(): try: transfers.pause() wait_until(controller_available, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Controller not available") # Wait for balancer tick to run so the data is populated. wait_until(lambda: get_partition_balancer_status( lambda s: s["status"] != "starting"), - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="Balancer tick did not run in time") return get_partition_balancer_status(lambda s: s[ @@ -492,7 +572,7 @@ def no_pending_force_reconfigurations(): transfers.resume() wait_until(no_pending_force_reconfigurations, - timeout_sec=self.WAIT_TIMEOUT_S, + timeout_sec=self.default_timeout_sec, backoff_sec=3, err_msg="reported force recovery count is non zero") @@ -502,6 +582,21 @@ def no_pending_force_reconfigurations(): self.redpanda._admin.await_stable_leader( topic=topic.name, partition=part, - timeout_s=self.WAIT_TIMEOUT_S, + timeout_s=self.default_timeout_sec, backoff_s=2, hosts=self._alive_nodes()) + topic_hws_after_recovery = {} + for t in topics: + topic_hws_after_recovery[ + t.name] = self.get_topic_partition_high_watermarks(t.name) + + for t in topics: + for id, (initial_hw, final_hw) in enumerate( + zip(initial_topic_hws[t.name], + topic_hws_after_recovery[t.name])): + self.logger.info( + f"partition {t}/{id} replicas initial high watermark: {initial_hw} final high watermark: {final_hw}" + ) + if t.redpanda_remote_write or t.replication_factor == 3: + assert all( + [f >= 0.8 * i for i, f in zip(initial_hw, final_hw)])