Skip to content

Commit

Permalink
tests: added test validating idemotent producers with write caching
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <michal@redpanda.com>
(cherry picked from commit 47d121a)
  • Loading branch information
mmaslankaprv authored and vbotbuildovich committed Oct 11, 2024
1 parent 390d9a3 commit b7bae63
Showing 1 changed file with 59 additions and 2 deletions.
61 changes: 59 additions & 2 deletions tests/rptest/tests/idempotency_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierProducer
from rptest.services.redpanda import SISettings
from rptest.tests.prealloc_nodes import PreallocNodesTest

from rptest.utils.node_operations import FailureInjectorBackgroundThread
from rptest.tests.redpanda_test import RedpandaTest
from rptest.clients.rpk import RpkTool

Expand Down Expand Up @@ -105,4 +105,61 @@ def test_recovery_after_snapshot_is_delivered(self):
1)
producer.stop()

assert producer.produce_status.bad_offsets == 0, "Producer bad offsets detected"
assert producer.produce_status.bad_offsets == 0, "Producer bad offsets detected"


class IdempotencyWriteCachingTest(PreallocNodesTest):
def __init__(self, test_context):
super(IdempotencyWriteCachingTest, self).__init__(
test_context=test_context,
node_prealloc_count=1,
)

@cluster(num_nodes=5)
def test_idempotent_producers_write_caching(self):

msg_size = 128
rate_limit = 10 * (1024 * 1024) if not self.debug_mode else 1024 * 1024
msg_cnt = int(15 * rate_limit / msg_size)

topic = TopicSpec(partition_count=4, replication_factor=3)

DefaultClient(self.redpanda).create_topic(topic)

rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_WRITE_CACHING,
"true")
# configure topic with write caching to delay state machine apply
rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_FLUSH_BYTES,
1024 * 1024)
rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_FLUSH_MS,
1024 * 1024)
fi = FailureInjectorBackgroundThread(self.redpanda,
self.logger,
max_inter_failure_time=30,
min_inter_failure_time=10,
max_suspend_duration_seconds=4)
producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic.name,
msg_size,
msg_cnt,
custom_node=self.preallocated_nodes,
rate_limit_bps=rate_limit)
try:
producer.start(clean=False)
fi.start()
producer.wait()

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
topic.name,
msg_size,
readers=2,
max_msgs=msg_cnt,
group_name="test-group")
consumer.start()
consumer.wait()
finally:
fi.stop()

0 comments on commit b7bae63

Please sign in to comment.