diff --git a/tests/rptest/tests/write_caching_test.py b/tests/rptest/tests/write_caching_test.py index fe881806a8bf..9c67bbda5537 100644 --- a/tests/rptest/tests/write_caching_test.py +++ b/tests/rptest/tests/write_caching_test.py @@ -17,10 +17,12 @@ import random from enum import Enum +from ducktape.mark import ignore from ducktape.utils.util import wait_until from rptest.clients.rpk import RpkException, RpkTool from rptest.clients.types import TopicSpec +from rptest.services.admin import Admin from rptest.services.cluster import cluster from rptest.services.failure_injector import FailureSpec, make_failure_injector from rptest.services.kgo_verifier_services import ( @@ -28,7 +30,6 @@ KgoVerifierSeqConsumer, ) from rptest.tests.redpanda_test import RedpandaTest -from rptest.services.admin import Admin # no StrEnum support in test python version @@ -193,6 +194,103 @@ def __init__(self, test_context): self.topic_name = "test" self.topics = [TopicSpec(name=self.topic_name)] + # This test shouldn't observe data loss because we are replicating to + # majority of replicas and the leader returns after the election. + @cluster(num_nodes=5) + def test_crash_leader(self): + consumer = KgoVerifierSeqConsumer( + self.test_context, + self.redpanda, + self.topic_name, + loop=False, + continuous=True, + tolerate_data_loss=False, + ) + consumer.start() + + num_restarts = 5 + num_acked_per_iter = 1000 + num_consumed_per_iter = 1000 + + msg_size = 512 + producer = KgoVerifierProducer( + self.test_context, + self.redpanda, + self.topic_name, + msg_size, + 1_000_000_000, + batch_max_bytes=2 * msg_size, + max_buffered_records=1, + tolerate_data_loss=False, + ) + producer.start() + + acks_count = 0 + consume_count = 0 + + # Introduce crashes to trigger data loss but verify that topic is still + # functioning correctly: + # a) data is still being produced, + # b) data is still being consumed. + for i in range(0, num_restarts): + acks_count = producer._status.acked + + # Validate producer health. + self.logger.debug( + f"Waiting producer acked to reach {acks_count + num_acked_per_iter} " + f"messages in iteration {i}/{num_restarts}") + producer.wait_for_acks(acks_count + num_acked_per_iter, + timeout_sec=60, + backoff_sec=1) + acks_count = producer._status.acked + + # Validate consumer health. + self.logger.debug( + f"Waiting for consumer to reach {consume_count + num_consumed_per_iter} messages" + ) + wait_until(lambda: consumer._status.validator.total_reads >= + consume_count + num_consumed_per_iter, + timeout_sec=60, + backoff_sec=1) + consume_count = consumer._status.validator.total_reads + + # Introduce failure. + admin = Admin(self.redpanda) + leader_id = admin.get_partition_leader(namespace="kafka", + topic=self.topic_name, + partition=0) + node = self.redpanda.get_node(leader_id) + + self.logger.debug(f"Restarting leader node: {node.name}") + self.redpanda.signal_redpanda(node) + + def wait_new_leader(): + new_leader_id = admin.get_partition_leader( + namespace="kafka", topic=self.topic_name, partition=0) + return new_leader_id != -1 and new_leader_id != leader_id + + self.logger.debug("Waiting for new leader") + wait_until(wait_new_leader, + timeout_sec=30, + backoff_sec=1, + err_msg="New leader not elected") + + self.logger.debug(f"Starting back old leader node: {node.name}") + self.redpanda.start_node(node) + + producer.stop() + + # Stop consumer. + consumer.wait() + consumer.stop() + + self.logger.info( + f"Lost offsets: {consumer._status.validator.lost_offsets}") + + # This test doesn't tolerate data loss. + assert consumer._status.validator.lost_offsets is None + + @ignore @cluster(num_nodes=5) def test_crash_all_or_leader(self): consumer = KgoVerifierSeqConsumer( @@ -356,7 +454,7 @@ def test_unavoidable_data_loss(self): f"Waiting producer acked to reach {acks_count + num_acked_per_iter} " f"messages in iteration {i}/{num_restarts}") producer.wait_for_acks(acks_count + num_acked_per_iter, - timeout_sec=60, + timeout_sec=120, backoff_sec=1) acks_count = producer._status.acked @@ -366,7 +464,7 @@ def test_unavoidable_data_loss(self): ) wait_until(lambda: consumer._status.validator.total_reads >= consume_count + num_consumed_per_iter, - timeout_sec=60, + timeout_sec=120, backoff_sec=1) consume_count = consumer._status.validator.total_reads @@ -411,3 +509,6 @@ def wait_new_leader(): # redpanda to. assert consumer._status.validator.lost_offsets['0'] > 0, \ "No lost messages observed. The test is not valid." + + def teardown(self): + make_failure_injector(self.redpanda)._heal_all()