Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement native_iceberg_drop #292

Merged
merged 28 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5fd1d8c
Bootstrap an IAM role based on restrictive permissions to be used for…
brabster May 9, 2023
adf862d
Merge branch 'main' into fix-285-native-iceberg-drop
mattiamatrix May 14, 2023
871cc90
Merge branch 'main' into fix-285-native-iceberg-drop
nicor88 May 15, 2023
5c42d2a
Merge branch 'main' into fix-285-native-iceberg-drop
mattiamatrix May 15, 2023
b54a5a2
Merge branch 'main' into fix-285-native-iceberg-drop
nicor88 May 17, 2023
2c75f4e
Merge branch 'main' into fix-285-native-iceberg-drop
nicor88 May 17, 2023
a272c07
Behaviour confirmed via logs, seeds using S3 means can't confirm for …
brabster May 17, 2023
0cc5655
Merge branch 'dbt-athena:main' into fix-285-native-iceberg-drop
brabster May 18, 2023
01e7e02
Merge branch 'main' into fix-285-native-iceberg-drop
brabster May 22, 2023
8d58d48
Take in account @nicor88 comment on excluding hive tables from native…
brabster May 22, 2023
d5d8821
Move restrictive IAM role to infra repo
brabster May 22, 2023
4a0e8dd
Merge branch 'dbt-athena:main' into fix-285-native-iceberg-drop
brabster May 23, 2023
0910b34
Apply feedback about relation type lookup
brabster May 23, 2023
b7ec820
Merge remote-tracking branch 'origin/main' into fix-285-native-iceber…
brabster May 30, 2023
de0c5b4
Update dbt/include/athena/macros/materializations/models/table/create…
brabster May 30, 2023
2cd6ecc
refactor adapter methods to reduce duplication and adhere to current …
brabster May 30, 2023
f571647
Merge branch 'dbt-athena:main' into fix-285-native-iceberg-drop
brabster May 30, 2023
673dac9
Merge remote-tracking branch 'origin/fix-285-native-iceberg-drop' int…
brabster May 30, 2023
d25a73f
Update dbt/adapters/athena/impl.py
brabster May 31, 2023
d0c64ce
Update dbt/adapters/athena/impl.py
brabster May 31, 2023
d3dafdb
fix up imprt
brabster May 31, 2023
5bcc3a0
add docs
brabster May 31, 2023
4b41762
Merge branch 'main' into fix-285-native-iceberg-drop
Jrmyy Jun 7, 2023
de00c89
fix config parameter parsing error, remove unnecessary logging
brabster Jun 7, 2023
766cf09
use another adapter as reference for bool config param
brabster Jun 7, 2023
369e664
Merge branch 'main' into fix-285-native-iceberg-drop
Jrmyy Jun 9, 2023
d4f873d
Merge branch 'main' into fix-285-native-iceberg-drop
svdimchenko Jun 12, 2023
2307073
Fixed isort
svdimchenko Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ repos:
# Identify invalid files
- id: check-ast
- id: check-yaml
args: ['--unsafe']
- id: check-json
- id: check-toml

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ _Additional information_
* `field_delimiter` (`default=none`)
* Custom field delimiter, for when format is set to `TEXTFILE`
* `table_properties`: table properties to add to the table, valid for Iceberg only
+ `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be made to manage data in S3. Data in S3 will only be cleared up for Iceberg tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.
Jrmyy marked this conversation as resolved.
Show resolved Hide resolved
* `lf_tags_config` (`default=none`)
* [AWS lakeformation](#aws-lakeformation-integration) tags to associate with the table and columns
* format for model config:
Expand Down
6 changes: 1 addition & 5 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,7 @@ def open(cls, connection: Connection) -> Connection:
session=get_boto3_session(connection),
retry_config=RetryConfig(
attempt=creds.num_retries,
exceptions=(
"ThrottlingException",
"TooManyRequestsException",
"InternalServerException",
),
exceptions=("ThrottlingException", "TooManyRequestsException", "InternalServerException"),
),
config=get_boto3_config(),
)
Expand Down
31 changes: 28 additions & 3 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import agate
from botocore.exceptions import ClientError
from mypy_boto3_athena.type_defs import DataCatalogTypeDef
from mypy_boto3_glue.type_defs import GetTableResponseTypeDef

from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.column import AthenaColumn
Expand All @@ -31,6 +32,7 @@
RELATION_TYPE_MAP,
AthenaRelation,
AthenaSchemaSearchMap,
TableType,
get_table_type,
)
from dbt.adapters.athena.s3 import S3DataNaming
Expand Down Expand Up @@ -189,10 +191,9 @@ def generate_s3_location(
return mapping[s3_data_naming]

@available
def get_glue_table_location(self, relation: AthenaRelation) -> Optional[str]:
def get_glue_table(self, relation: AthenaRelation) -> Optional[GetTableResponseTypeDef]:
"""
Helper function to get location of a relation in S3.
Will return None if the table does not exist or does not have a location (views)
Helper function to get a relation via Glue
"""
conn = self.connections.get_thread_connection()
client = conn.handle
Expand All @@ -206,6 +207,30 @@ def get_glue_table_location(self, relation: AthenaRelation) -> Optional[str]:
LOGGER.debug(f"Table {relation.render()} does not exists - Ignoring")
return None
raise e
return table

@available
def get_glue_table_type(self, relation: AthenaRelation) -> Optional[TableType]:
"""
Get the table type of the relation from Glue
"""
table = self.get_glue_table(relation)
if not table:
LOGGER.debug(f"Table {relation.render()} does not exist - Ignoring")
return None

return get_table_type(table["Table"])

@available
def get_glue_table_location(self, relation: AthenaRelation) -> Optional[str]:
"""
Helper function to get location of a relation in S3.
Will return None if the table does not exist or does not have a location (views)
"""
table = self.get_glue_table(relation)
if not table:
LOGGER.debug(f"Table {relation.render()} does not exist - Ignoring")
return None

table_type = get_table_type(table["Table"])
table_location = table["Table"].get("StorageDescriptor", {}).get("Location")
Expand Down
26 changes: 26 additions & 0 deletions dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,34 @@
{% macro athena__drop_relation(relation) -%}
{%- set native_drop = config.get('native_drop', default=false) -%}
svdimchenko marked this conversation as resolved.
Show resolved Hide resolved
{%- set rel_type_object = adapter.get_glue_table_type(relation) -%}
{%- set rel_type = none if rel_type_object == none else rel_type_object.value -%}
{%- set natively_droppable = rel_type == 'iceberg_table' or relation.type == 'view' -%}

{%- if native_drop and natively_droppable -%}
{%- do drop_relation_sql(relation) -%}
brabster marked this conversation as resolved.
Show resolved Hide resolved
{%- else -%}
{%- do drop_relation_glue(relation) -%}
{%- endif -%}
{% endmacro %}

{% macro drop_relation_glue(relation) -%}
{%- do log('Dropping relation via Glue and S3 APIs') -%}
{%- do adapter.clean_up_table(relation) -%}
{%- do adapter.delete_from_glue_catalog(relation) -%}
{% endmacro %}

{% macro drop_relation_sql(relation) -%}

{%- do log('Dropping relation via SQL only') -%}
{% call statement('drop_relation', auto_begin=False) -%}
{%- if relation.type == 'view' -%}
drop {{ relation.type }} if exists {{ relation.render() }}
{%- else -%}
drop {{ relation.type }} if exists {{ relation.render_hive() }}
brabster marked this conversation as resolved.
Show resolved Hide resolved
{% endif %}
{%- endcall %}
{% endmacro %}

{% macro set_table_classification(relation) -%}
{%- set format = config.get('format', default='parquet') -%}
{% call statement('set_table_classification', auto_begin=False) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
s3_data_naming,
external_location,
temporary) -%}
{%- set native_drop = config.get('native_drop', default=false) -%}

{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
Expand Down Expand Up @@ -47,7 +48,11 @@
{%- endif -%}
{%- endif %}

{% do adapter.delete_from_s3(location) %}
{%- if native_drop and table_type == 'iceberg' -%}
{% do log('Config native_drop enabled, skipping direct S3 delete') %}
{%- else -%}
{% do adapter.delete_from_s3(location) %}
{%- endif -%}

create table {{ relation }}
with (
Expand Down
151 changes: 151 additions & 0 deletions tests/functional/adapter/test_basic_hive_native_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
Run the basic dbt test suite on hive tables when applicable.

Some test classes are not included here, because they don't contain table models.
Those are run in the hive test suite.
"""
import pytest

from dbt.tests.adapter.basic.files import (
base_ephemeral_sql,
base_materialized_var_sql,
base_table_sql,
base_view_sql,
config_materialized_incremental,
config_materialized_table,
ephemeral_table_sql,
ephemeral_view_sql,
generic_test_table_yml,
generic_test_view_yml,
incremental_sql,
model_base,
model_ephemeral,
schema_base_yml,
)
from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod
from dbt.tests.adapter.basic.test_base import BaseSimpleMaterializations
from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral
from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests
from dbt.tests.adapter.basic.test_incremental import BaseIncremental
from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols
from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp

modified_config_materialized_table = """
{{ config(materialized="table", table_type="hive", native_drop="True") }}
"""

modified_config_materialized_incremental = """
{{ config(materialized="incremental",
table_type="hive",
incremental_strategy="append",
unique_key="id",
native_drop="True") }}
"""

modified_model_base = """
select
id,
name,
{{ cast_timestamp('some_date') }} as some_date
from {{ source('raw', 'seed') }}
"""

modified_model_ephemeral = """
select
id,
name,
{{ cast_timestamp('some_date') }} as some_date
from {{ ref('ephemeral') }}
"""


def configure_single_model_to_use_iceberg(model):
"""Adjust a given model configuration to use iceberg instead of hive."""
replacements = [
(config_materialized_table, modified_config_materialized_table),
(config_materialized_incremental, modified_config_materialized_incremental),
(model_base, modified_model_base),
(model_ephemeral, modified_model_ephemeral),
]
for original, new in replacements:
model = model.replace(original.strip(), new.strip())

return model


def configure_models_to_use_iceberg(models):
"""Loop over all the dbt models and set the table configuration to iceberg."""
return {key: configure_single_model_to_use_iceberg(val) for key, val in models.items()}


@pytest.mark.skip(
reason="The materialized var doesn't work well, because we only want to change tables, not views. "
"It's hard to come up with an elegant fix."
)
class TestSimpleMaterializationsIceberg(BaseSimpleMaterializations):
@pytest.fixture(scope="class")
def models(self):
return configure_models_to_use_iceberg(
{
"view_model.sql": base_view_sql,
"table_model.sql": base_table_sql,
"swappable.sql": base_materialized_var_sql,
"schema.yml": schema_base_yml,
}
)


class TestEphemeralIceberg(BaseEphemeral):
@pytest.fixture(scope="class")
def models(self):
return configure_models_to_use_iceberg(
{
"ephemeral.sql": base_ephemeral_sql,
"view_model.sql": ephemeral_view_sql,
"table_model.sql": ephemeral_table_sql,
"schema.yml": schema_base_yml,
}
)


class TestIncrementalIceberg(BaseIncremental):
@pytest.fixture(scope="class")
def models(self):
return configure_models_to_use_iceberg(
{
"incremental.sql": incremental_sql,
"schema.yml": schema_base_yml,
}
)


class TestGenericTestsIceberg(BaseGenericTests):
@pytest.fixture(scope="class")
def models(self):
return configure_models_to_use_iceberg(
{
"view_model.sql": base_view_sql,
"table_model.sql": base_table_sql,
"schema.yml": schema_base_yml,
"schema_view.yml": generic_test_view_yml,
"schema_table.yml": generic_test_table_yml,
}
)


@pytest.mark.skip(reason="The in-place update is not supported for seeds. We need our own implementation instead.")
class TestSnapshotCheckColsIceberg(BaseSnapshotCheckCols):
pass


@pytest.mark.skip(reason="The in-place update is not supported for seeds. We need our own implementation instead.")
class TestSnapshotTimestampIceberg(BaseSnapshotTimestamp):
pass


@pytest.mark.skip(
reason="Fails because the test tries to fetch the table metadata during the compile step, "
"before the models are actually run. Not sure how this test is intended to work."
)
class TestBaseAdapterMethodIceberg(BaseAdapterMethod):
pass
Loading