Skip to content

Commit

Permalink
dt/xform: Test that transforms consume from end of topic
Browse files Browse the repository at this point in the history
Specifically in situations where no records are produced between
deploy time and start time.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
(cherry picked from commit 0f49b21)
  • Loading branch information
oleiman committed Jul 3, 2024
1 parent 5f01cf4 commit bda09b8
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from rptest.services.cluster import cluster
from rptest.services.redpanda import MetricSamples, MetricsEndpoint
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError
from rptest.services.transform_verifier_service import TransformVerifierProduceConfig, TransformVerifierProduceStatus, TransformVerifierService, TransformVerifierConsumeConfig, TransformVerifierConsumeStatus
from rptest.services.admin import Admin, CommittedWasmOffset

Expand Down Expand Up @@ -317,6 +318,24 @@ def all_offsets_removed():
retry_on_exc=True,
)

@cluster(num_nodes=4)
def test_consume_from_end(self):
"""
Test that by default transforms read from the end of the topic if no records
are produced between deploy time and transform start time.
"""
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
wait_running=True)

with expect_exception(TimeoutError, lambda _: True):
consumer_status = self._consume_output_topic(
topic=self.topics[1], status=producer_status)


class DataTransformsChainingTest(BaseDataTransformsTest):
"""
Expand Down

0 comments on commit bda09b8

Please sign in to comment.