Skip to content

Commit

Permalink
rptest/datalake: test metadata interoperability with 3rd party system
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Dec 24, 2024
1 parent 8d94a04 commit 1292bb1
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions tests/rptest/tests/datalake/3rdparty_rewrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from ducktape.mark import matrix

from rptest.services.cluster import cluster
from rptest.services.redpanda import SISettings
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types
from rptest.tests.redpanda_test import RedpandaTest


class Datalake3rdPartyRewriteTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super().__init__(test_ctx,
num_brokers=1,
si_settings=SISettings(test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
*args,
**kwargs)

self.test_ctx = test_ctx
self.topic_name = "test"
self.num_partitions = 10

self.produced_messages = 0

def setUp(self):
# redpanda will be started by DatalakeServices
pass

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO],
filesystem_catalog_mode=[True, False])
def test_e2e_basic(self, cloud_storage_type, query_engine,
filesystem_catalog_mode):
"""
This test verifies that Redpanda can continue to work with Iceberg
metadata written by third-party query engines. We use an optimize operation
with a third-party query engine to trigger a rewrite of the data files
and metadata.
"""
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[query_engine]) as dl:
dl.create_iceberg_enabled_topic(self.topic_name,
partitions=self.num_partitions)
# Write some data to the topic.
self._translate_sample_data(dl)

# Run maintenance to rewrite the data.
dl.any_query_engine().run_sample_maintenance_task(
"redpanda", self.topic_name)

# Verify consistency post rewrite.
DatalakeVerifier.oneshot(self.redpanda, self.topic_name,
dl.any_query_engine())

# Produce additional messages to the topic to make sure we correctly
# interoperate with the metadata written by Trino.
self._translate_sample_data(dl)

# Verify consistency with the additional messages.
DatalakeVerifier.oneshot(self.redpanda, self.topic_name,
dl.any_query_engine())

def _translate_sample_data(self, dl):
NUM_MSG_PER_SAMPLE = 100
self.produced_messages += NUM_MSG_PER_SAMPLE

dl.produce_to_topic(self.topic_name, 1024, NUM_MSG_PER_SAMPLE)
# Wait for all messages (including the ones we just wrote) to be translated.
dl.wait_for_translation(self.topic_name,
msg_count=self.produced_messages)

0 comments on commit 1292bb1

Please sign in to comment.