Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest/datalake: test metadata interoperability with 3rd party system #24643

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
19 changes: 19 additions & 0 deletions tests/rptest/services/spark_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,25 @@ def escape_identifier(self, table: str) -> str:
def engine_name():
return QueryEngineType.SPARK

def run_sample_maintenance_task(self, namespace, table) -> None:
# Metadata query
# https://iceberg.apache.org/docs/1.6.1/spark-queries/#files
initial_parquet_files = self.run_query_fetch_one(
f"SELECT count(*) FROM {namespace}.{table}.files")[0]

# Want at least 2 files to be able to assert that optimization did something.
assert initial_parquet_files >= 2, f"Expecting at least 2 files, got {initial_parquet_files}"

# Spark Procedures provided by Iceberg SQL Extensions
# https://iceberg.apache.org/docs/1.6.1/spark-procedures/#rewrite_data_files
self.run_query_fetch_one(
f"CALL `redpanda-iceberg-catalog`.system.rewrite_data_files(\"{namespace}.{table}\")"
)

optimized_parquet_files = self.run_query_fetch_one(
f"SELECT count(*) FROM {namespace}.{table}.files")[0]
assert optimized_parquet_files < initial_parquet_files, f"Expecting fewer files after optimize, got {optimized_parquet_files}"
Comment on lines +152 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these service classes should only hold operations, not validations. To that end, can we split this into some count_parquet_files() and optimize_parquet_files()? There are many different kinds of maintenance https://iceberg.apache.org/docs/1.5.1/maintenance/ and it'll be much easier to reuse if we're more descriptive and have smaller, tighter primitives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrwng I see what you mean but here I wanted to give more freedom to these maintenance tasks implementation, i.e. it might as well be a metadata only maintenance. I documented the expectations in the code doc

def run_sample_maintenance_task(self, namespace, table) -> None:

optimize_parquet_files and count_parquet_files would work for the 2 engines we have now, do you reckon all of them expose the same primitives so that we could have a single test for all of them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that run_sample_maintenance_task() seems a bit vague, whereas the iceberg docs Andrew provided has a concrete list of tasks we could enumerate, these being:

Maybe these are the primitives we should be working with and leave the assertions to the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these are the primitives we should be working with and leave the assertions to the user.

Assertions like ...? I don't understand what the exact proposal is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Willem's referring to assert statements in run_sample_maintenance_task() -- to my earlier point, I feel strongly that they don't belong in this Spark service library. They belong in test bodies, or other test verifiers.

here I wanted to give more freedom to these maintenance tasks implementation...do you reckon all of them expose the same primitives so that we could have a single test for all of them?

I can see how it's a nice property to have, that most query engines can implement some kind of maintenance, and so would be able to slot into the new test. To your point, I don't think every engine will always support the exact set of maintenance operations I posted/suggested. IMO once we come across such an engine, we should decide what to do at that time, rather than preemptively introducing a loosely defined primitive -- it's hard for me to imagine this maintenance function will be used widely by test authors.

If you insist that "arbitrary maintenance" belongs here, then please also at least split out optimize_parquet_files and count_parquet_files (or your preferred names) into the Spark and Trino services and have run_sample_maintenance_task() call them.


def make_client(self):
assert self.spark_host
return hive.connect(host=self.spark_host, port=self.spark_port)
Expand Down
16 changes: 16 additions & 0 deletions tests/rptest/services/trino_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,22 @@ def make_client(self):
def escape_identifier(self, table: str) -> str:
return f'"{table}"'

def run_sample_maintenance_task(self, namespace, table) -> None:
# See Trino metadata tables documentation
# https://trino.io/docs/current/connector/iceberg.html#files-table
initial_parquet_files = self.count_table(namespace, f"{table}$files")

# Want at least 2 files to be able to assert that optimization did something.
assert initial_parquet_files >= 2, f"Expecting at least 2 files, got {initial_parquet_files}"

# Optimize the table to rewrite the data.
# https://trino.io/docs/current/connector/iceberg.html#alter-table-execute
self.run_query_fetch_one(
f"ALTER TABLE {namespace}.{table} EXECUTE optimize")

optimized_parquet_files = self.count_table(namespace, f"{table}$files")
assert optimized_parquet_files < initial_parquet_files, f"Expecting fewer files after optimize, got {optimized_parquet_files}"

@staticmethod
def dict_to_conf(d: dict[str, Optional[str | bool]]):
"""
Expand Down
86 changes: 86 additions & 0 deletions tests/rptest/tests/datalake/3rdparty_maintenance_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2024 Vectorized, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from ducktape.mark import matrix
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved

from rptest.services.cluster import cluster
from rptest.services.redpanda import SISettings
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.utils import supported_storage_types
from rptest.tests.redpanda_test import RedpandaTest


class Datalake3rdPartyMaintenanceTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super().__init__(test_ctx,
num_brokers=1,
si_settings=SISettings(test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
*args,
**kwargs)

self.test_ctx = test_ctx
self.topic_name = "test"
self.num_partitions = 10

self.produced_messages = 0

def setUp(self):
# redpanda will be started by DatalakeServices
pass

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO],
filesystem_catalog_mode=[True, False])
def test_e2e_basic(self, cloud_storage_type, query_engine,
filesystem_catalog_mode):
"""
This test verifies that Redpanda can continue to work with Iceberg
metadata written by third-party query engines. We use an optimize operation
with a third-party query engine to trigger a rewrite of the data files
and metadata.
"""
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=filesystem_catalog_mode,
include_query_engines=[query_engine]) as dl:
dl.create_iceberg_enabled_topic(self.topic_name,
partitions=self.num_partitions)
# Write some data to the topic.
self._translate_sample_data(dl)

# Run maintenance to rewrite the data.
dl.query_engine(query_engine).run_sample_maintenance_task(
"redpanda", self.topic_name)

# Verify consistency post rewrite.
DatalakeVerifier.oneshot(self.redpanda, self.topic_name,
dl.query_engine(query_engine))

# Produce additional messages to the topic to make sure we correctly
# interoperate with the metadata written by Trino.
self._translate_sample_data(dl)

# Verify consistency with the additional messages.
DatalakeVerifier.oneshot(self.redpanda, self.topic_name,
dl.query_engine(query_engine))

def _translate_sample_data(self, dl):
NUM_MSG_PER_SAMPLE = 100
self.produced_messages += NUM_MSG_PER_SAMPLE

dl.produce_to_topic(self.topic_name, 1024, NUM_MSG_PER_SAMPLE)
# Wait for all messages (including the ones we just wrote) to be translated.
dl.wait_for_translation(self.topic_name,
msg_count=self.produced_messages)
9 changes: 8 additions & 1 deletion tests/rptest/tests/datalake/datalake_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from rptest.services.redpanda import RedpandaService
from rptest.services.spark_service import SparkService
from rptest.services.trino_service import TrinoService
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.tests.datalake.query_engine_base import QueryEngineBase, QueryEngineType
from rptest.services.redpanda_connect import RedpandaConnectService
from rptest.tests.datalake.query_engine_factory import get_query_engine_by_type

Expand Down Expand Up @@ -85,6 +85,13 @@ def __enter__(self):
def __exit__(self, *args, **kwargs):
self.tearDown()

def query_engine(self, type: QueryEngineType) -> QueryEngineBase:
for e in self.query_engines:
assert isinstance(e, QueryEngineBase)
if e.engine_name() == type:
return e
raise Exception(f"Query engine {type} not found")

def trino(self) -> TrinoService:
trino = self.service(QueryEngineType.TRINO)
assert trino, "Missing Trino service"
Expand Down
9 changes: 9 additions & 0 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,12 @@ def stop(self):
self.logger.debug(f"queried offsets: {self._max_queried_offsets}")

assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table"

@staticmethod
def oneshot(redpanda: RedpandaService,
topic: str,
query_engine: QueryEngineBase,
progress_timeout_sec=30):
verifier = DatalakeVerifier(redpanda, topic, query_engine)
verifier.start()
verifier.wait(progress_timeout_sec=progress_timeout_sec)
26 changes: 20 additions & 6 deletions tests/rptest/tests/datalake/query_engine_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ def run_query(self, query):
client = self.make_client()
assert client
try:
try:
cursor = client.cursor()
cursor.execute(query)
yield cursor
finally:
cursor.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this redundant? it seems like it was handling the cursor closing, and the outer try block was handling the client closing.

cursor = client.cursor()
cursor.execute(query)
yield cursor
finally:
client.close()

Expand All @@ -50,6 +47,10 @@ def run_query_fetch_all(self, query):
with self.run_query(query) as cursor:
return cursor.fetchall()

def run_query_fetch_one(self, query):
with self.run_query(query) as cursor:
return cursor.fetchone()

def count_table(self, namespace, table) -> int:
query = f"select count(*) from {namespace}.{self.escape_identifier(table)}"
with self.run_query(query) as cursor:
Expand All @@ -59,3 +60,16 @@ def max_translated_offset(self, namespace, table, partition) -> int:
query = f"select max(redpanda.offset) from {namespace}.{self.escape_identifier(table)} where redpanda.partition={partition}"
with self.run_query(query) as cursor:
return cursor.fetchone()[0]

def run_sample_maintenance_task(self, namespace, table) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure what _sample_ in the name refers to

"""
Subclasses should implement this method to run a maintenance task on the
given table.

Useful to test that redpanda can still work with the table after the
maintenance task is run.

The method must guarantee that a maintenance task was run on the table
after it returns for the tests to be valid.
"""
raise NotImplementedError
Loading