diff --git a/tests/rptest/tests/idempotency_test.py b/tests/rptest/tests/idempotency_test.py index ae6e7f58fd918..3250730acc2f7 100644 --- a/tests/rptest/tests/idempotency_test.py +++ b/tests/rptest/tests/idempotency_test.py @@ -16,7 +16,7 @@ from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer from rptest.services.redpanda import SISettings from rptest.tests.prealloc_nodes import PreallocNodesTest - +from rptest.utils.node_operations import FailureInjectorBackgroundThread from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.rpk import RpkTool @@ -105,4 +105,61 @@ def test_recovery_after_snapshot_is_delivered(self): 1) producer.stop() - assert producer.produce_status.bad_offsets == 0, "Producer bad offsets detected" \ No newline at end of file + assert producer.produce_status.bad_offsets == 0, "Producer bad offsets detected" + + +class IdempotencyWriteCachingTest(PreallocNodesTest): + def __init__(self, test_context): + super(IdempotencyWriteCachingTest, self).__init__( + test_context=test_context, + node_prealloc_count=1, + ) + + @cluster(num_nodes=5) + def test_idempotent_producers_write_caching(self): + + msg_size = 128 + rate_limit = 10 * (1024 * 1024) if not self.debug_mode else 1024 * 1024 + msg_cnt = int(15 * rate_limit / msg_size) + + topic = TopicSpec(partition_count=4, replication_factor=3) + + DefaultClient(self.redpanda).create_topic(topic) + + rpk = RpkTool(self.redpanda) + rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_WRITE_CACHING, + "true") + # configure topic with write caching to delay state machine apply + rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_FLUSH_BYTES, + 1024 * 1024) + rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_FLUSH_MS, + 1024 * 1024) + fi = FailureInjectorBackgroundThread(self.redpanda, + self.logger, + max_inter_failure_time=30, + min_inter_failure_time=10, + max_suspend_duration_seconds=4) + producer = KgoVerifierProducer(self.test_context, + self.redpanda, + topic.name, + msg_size, + msg_cnt, + custom_node=self.preallocated_nodes, + rate_limit_bps=rate_limit) + try: + producer.start(clean=False) + fi.start() + producer.wait() + + consumer = KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + topic.name, + msg_size, + readers=2, + max_msgs=msg_cnt, + group_name="test-group") + consumer.start() + consumer.wait() + finally: + fi.stop()