Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runs staging tests on athena #1764

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,21 @@ class SupportsStagingDestination(ABC):
def should_load_data_to_staging_dataset_on_staging_destination(
self, table: TTableSchema
) -> bool:
"""If set to True, and staging destination is configured, the data will be loaded to staging dataset on staging destination
instead of a regular dataset on staging destination. Currently it is used by Athena Iceberg which uses staging dataset
on staging destination to copy data to iceberg tables stored on regular dataset on staging destination.
The default is to load data to regular dataset on staging destination from where warehouses like Snowflake (that have their
own storage) will copy data.
"""
return False

@abstractmethod
def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
"""If set to True, data in `table` will be truncated on staging destination (regular dataset). This is the default behavior which
can be changed with a config flag.
For Athena + Iceberg this setting is always False - Athena uses regular dataset to store Iceberg tables and we avoid touching it.
For Athena we truncate those tables only on "replace" write disposition.
"""
pass


Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
if table["write_disposition"] == "replace" and not self._is_iceberg_table(
self.prepare_load_table(table["name"])
):
return self.config.truncate_tables_on_staging_destination_before_load
return True
return False

def should_load_data_to_staging_dataset_on_staging_destination(
Expand Down
23 changes: 21 additions & 2 deletions tests/load/pipeline/test_stage_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,18 @@ def test_truncate_staging_dataset(destination_config: DestinationTestConfigurati
# check there are two staging files
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema)
with staging_client:
assert len(staging_client.list_table_files(table_name)) == 2 # type: ignore[attr-defined]
# except Athena + Iceberg which does not store tables in staging dataset
if (
destination_config.destination == "athena"
and destination_config.table_format == "iceberg"
):
table_count = 0
# but keeps them in staging dataset on staging destination - but only the last one
with staging_client.with_staging_dataset(): # type: ignore[attr-defined]
assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined]
else:
table_count = 2
assert len(staging_client.list_table_files(table_name)) == table_count # type: ignore[attr-defined]

# load the data with truncating, so only new file is on the staging
pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = True
Expand All @@ -231,7 +242,15 @@ def test_truncate_staging_dataset(destination_config: DestinationTestConfigurati
# check there is only one staging file
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema)
with staging_client:
assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined]
# except for Athena which does not delete staging destination tables
if destination_config.destination == "athena":
if destination_config.table_format == "iceberg":
table_count = 0
else:
table_count = 3
else:
table_count = 1
assert len(staging_client.list_table_files(table_name)) == table_count # type: ignore[attr-defined]


@pytest.mark.parametrize(
Expand Down
49 changes: 29 additions & 20 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,27 @@ def destinations_configs(
# build destination configs
destination_configs: List[DestinationTestConfiguration] = []

# default sql configs that are also default staging configs
default_sql_configs_with_staging = [
# Athena needs filesystem staging, which will be automatically set; we have to supply a bucket url though.
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
supports_merge=False,
bucket_url=AWS_BUCKET,
),
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
bucket_url=AWS_BUCKET,
force_iceberg=True,
supports_merge=True,
supports_dbt=False,
table_format="iceberg",
extra_info="iceberg",
),
]

# default non staging sql based configs, one per destination
if default_sql_configs:
destination_configs += [
Expand All @@ -268,26 +289,10 @@ def destinations_configs(
DestinationTestConfiguration(destination="duckdb", file_format="parquet"),
DestinationTestConfiguration(destination="motherduck", file_format="insert_values"),
]
# Athena needs filesystem staging, which will be automatically set; we have to supply a bucket url though.
destination_configs += [
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
supports_merge=False,
bucket_url=AWS_BUCKET,
)
]
destination_configs += [
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
bucket_url=AWS_BUCKET,
force_iceberg=True,
supports_merge=True,
supports_dbt=False,
extra_info="iceberg",
)
]

# add Athena staging configs
destination_configs += default_sql_configs_with_staging

destination_configs += [
DestinationTestConfiguration(
destination="clickhouse", file_format="jsonl", supports_dbt=False
Expand Down Expand Up @@ -332,6 +337,10 @@ def destinations_configs(
DestinationTestConfiguration(destination="qdrant", extra_info="server"),
]

if (default_sql_configs or all_staging_configs) and not default_sql_configs:
# athena default configs not added yet
destination_configs += default_sql_configs_with_staging

if default_staging_configs or all_staging_configs:
destination_configs += [
DestinationTestConfiguration(
Expand Down
Loading