-
Notifications
You must be signed in to change notification settings - Fork 599
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rptest/datalake: test metadata interoperability with 3rd party system
- Loading branch information
1 parent
1929e08
commit 6e607dd
Showing
1 changed file
with
78 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
from ducktape.mark import matrix | ||
|
||
from rptest.services.cluster import cluster | ||
from rptest.services.redpanda import SISettings | ||
from rptest.tests.datalake.datalake_services import DatalakeServices | ||
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier | ||
from rptest.tests.datalake.query_engine_base import QueryEngineType | ||
from rptest.tests.datalake.utils import supported_storage_types | ||
from rptest.tests.redpanda_test import RedpandaTest | ||
|
||
|
||
class Datalake3rdPartyRewriteTest(RedpandaTest): | ||
def __init__(self, test_ctx, *args, **kwargs): | ||
super().__init__(test_ctx, | ||
num_brokers=1, | ||
si_settings=SISettings(test_ctx), | ||
extra_rp_conf={ | ||
"iceberg_enabled": "true", | ||
"iceberg_catalog_commit_interval_ms": 5000 | ||
}, | ||
*args, | ||
**kwargs) | ||
|
||
self.test_ctx = test_ctx | ||
self.topic_name = "test" | ||
self.num_partitions = 10 | ||
|
||
self.produced_messages = 0 | ||
|
||
def setUp(self): | ||
# redpanda will be started by DatalakeServices | ||
pass | ||
|
||
@cluster(num_nodes=4) | ||
@matrix(cloud_storage_type=supported_storage_types(), | ||
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO], | ||
filesystem_catalog_mode=[True, False]) | ||
def test_e2e_basic(self, cloud_storage_type, query_engine, | ||
filesystem_catalog_mode): | ||
""" | ||
This test verifies that Redpanda can continue to work with Iceberg | ||
metadata written by third-party query engines. We use an optimize operation | ||
with a third-party query engine to trigger a rewrite of the data files | ||
and metadata. | ||
""" | ||
with DatalakeServices(self.test_ctx, | ||
redpanda=self.redpanda, | ||
filesystem_catalog_mode=filesystem_catalog_mode, | ||
include_query_engines=[query_engine]) as dl: | ||
dl.create_iceberg_enabled_topic(self.topic_name, | ||
partitions=self.num_partitions) | ||
# Write some data to the topic. | ||
self._translate_sample_data(dl) | ||
|
||
# Run maintenance to rewrite the data. | ||
dl.any_query_engine().run_sample_maintenance_task( | ||
"redpanda", self.topic_name) | ||
|
||
# Verify consistency post rewrite. | ||
DatalakeVerifier.oneshot(self.redpanda, self.topic_name, | ||
dl.any_query_engine()) | ||
|
||
# Produce additional messages to the topic to make sure we correctly | ||
# interoperate with the metadata written by Trino. | ||
self._translate_sample_data(dl) | ||
|
||
# Verify consistency with the additional messages. | ||
DatalakeVerifier.oneshot(self.redpanda, self.topic_name, | ||
dl.any_query_engine()) | ||
|
||
def _translate_sample_data(self, dl): | ||
NUM_MSG_PER_SAMPLE = 100 | ||
self.produced_messages += NUM_MSG_PER_SAMPLE | ||
|
||
dl.produce_to_topic(self.topic_name, 1024, NUM_MSG_PER_SAMPLE) | ||
# Wait for all messages (including the ones we just wrote) to be translated. | ||
dl.wait_for_translation(self.topic_name, | ||
msg_count=self.produced_messages) |