Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix athena partitions limit #360

Merged
merged 9 commits into from
Aug 15, 2023
8 changes: 7 additions & 1 deletion dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def execute( # type: ignore
endpoint_url: Optional[str] = None,
cache_size: int = 0,
cache_expiration_time: int = 0,
catch_partitions_limit: bool = False,
**kwargs,
):
def inner() -> AthenaCursor:
Expand All @@ -158,7 +159,12 @@ def inner() -> AthenaCursor:
return self

retry = tenacity.Retrying(
retry=retry_if_exception(lambda _: True),
# No need to retry if TOO_MANY_OPEN_PARTITIONS occurs.
# Otherwise, Athena throws ICEBERG_FILESYSTEM_ERROR after retry,
# because not all files are removed immediately after first try to create table
retry=retry_if_exception(
lambda e: False if catch_partitions_limit and "TOO_MANY_OPEN_PARTITIONS" in str(e) else True
),
stop=stop_after_attempt(self._retry_config.attempt),
wait=wait_exponential(
multiplier=self._retry_config.attempt,
Expand Down
25 changes: 25 additions & 0 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import os
import posixpath as path
import re
import tempfile
from itertools import chain
from textwrap import dedent
Expand All @@ -19,6 +20,7 @@
TableTypeDef,
TableVersionTypeDef,
)
from pyathena.error import OperationalError

from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.column import AthenaColumn
Expand Down Expand Up @@ -875,3 +877,26 @@ def _get_table_input(table: TableTypeDef) -> TableInputTypeDef:
returned by get_table() method.
"""
return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__}

@available
def run_query_with_partitions_limit_catching(self, sql: str) -> str:
conn = self.connections.get_thread_connection()
cursor = conn.handle.cursor()
try:
cursor.execute(sql, catch_partitions_limit=True)
except OperationalError as e:
LOGGER.debug(f"CAUGHT EXCEPTION: {e}")
if "TOO_MANY_OPEN_PARTITIONS" in str(e):
return "TOO_MANY_OPEN_PARTITIONS"
raise e
return "SUCCESS"

@available
def format_partition_keys(self, partition_keys: List[str]) -> str:
return ", ".join([self.format_one_partition_key(k) for k in partition_keys])

@available
def format_one_partition_key(self, partition_key: str) -> str:
"""Check if partition key uses Iceberg hidden partitioning"""
hidden = re.search(r"^(hour|day|month|year)\((.+)\)", partition_key.lower())
return f"date_trunc('{hidden.group(1)}', {hidden.group(2)})" if hidden else partition_key.lower()
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{% macro get_partition_batches(sql) -%}
{%- set partitioned_by = config.get('partitioned_by') -%}
{%- set athena_partitions_limit = config.get('partitions_limit', 100) | int -%}
{%- set partitioned_keys = adapter.format_partition_keys(partitioned_by) -%}
{% do log('PARTITIONED KEYS: ' ~ partitioned_keys) %}

{% call statement('get_partitions', fetch_result=True) %}
select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }};
{% endcall %}

{%- set table = load_result('get_partitions').table -%}
{%- set rows = table.rows -%}
{%- set partitions = {} -%}
{% do log('TOTAL PARTITIONS TO PROCESS: ' ~ rows | length) %}
{%- set partitions_batches = [] -%}

{%- for row in rows -%}
{%- set single_partition = [] -%}
{%- for col in row -%}

{%- set column_type = adapter.convert_type(table, loop.index0) -%}
{%- if column_type == 'integer' -%}
{%- set value = col | string -%}
{%- elif column_type == 'string' -%}
{%- set value = "'" + col + "'" -%}
{%- elif column_type == 'date' -%}
{%- set value = "DATE'" + col | string + "'" -%}
{%- elif column_type == 'timestamp' -%}
{%- set value = "TIMESTAMP'" + col | string + "'" -%}
{%- else -%}
{%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%}
{%- endif -%}
{%- set partition_key = adapter.format_one_partition_key(partitioned_by[loop.index0]) -%}
{%- do single_partition.append(partition_key + '=' + value) -%}
{%- endfor -%}

{%- set single_partition_expression = single_partition | join(' and ') -%}

{%- set batch_number = (loop.index0 / athena_partitions_limit) | int -%}
{% if not batch_number in partitions %}
{% do partitions.update({batch_number: []}) %}
{% endif %}

{%- do partitions[batch_number].append('(' + single_partition_expression + ')') -%}
{%- if partitions[batch_number] | length == athena_partitions_limit or loop.last -%}
{%- do partitions_batches.append(partitions[batch_number] | join(' or ')) -%}
{%- endif -%}
{%- endfor -%}

{{ return(partitions_batches) }}

{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,42 @@
{% endmacro %}

{% macro incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation, statement_name="main") %}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{%- set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{%- if not dest_columns -%}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{% endif %}
{%- endif -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- set insert_full -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endset -%}

{%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%}
{%- set insert_batch_partitions -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
);
{%- endset -%}
{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}
{%- endif -%}
SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}'
{%- endmacro %}


{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%}
{% call statement('get_partitions', fetch_result=True) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

{% set lf_tags_config = config.get('lf_tags_config') %}
{% set lf_grants = config.get('lf_grants') %}
{% set partitioned_by = config.get('partitioned_by', default=none) %}
{% set partitioned_by = config.get('partitioned_by') %}
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
Expand All @@ -24,16 +24,18 @@

{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) -%}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select 'SUCCESSFULLY CREATED TABLE {{ target_relation }} from scratch'" -%}
{% elif existing_relation.is_view or should_full_refresh() %}
{% do drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) -%}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select 'SUCCESSFULLY RECREATED TABLE {{ target_relation }}'" -%}
{% elif partitioned_by is not none and strategy == 'insert_overwrite' %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
Expand All @@ -42,7 +44,7 @@
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'merge' and table_type == 'iceberg' %}
Expand All @@ -67,7 +69,7 @@
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %}
{% do to_drop.append(tmp_relation) %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,67 @@
{%- endfor -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns_wo_keys) -%}
{%- set src_cols_csv = src_columns_quoted | join(', ') -%}
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
on (
{%- for key in unique_key_cols %}
target.{{ key }} = src.{{ key }} {{ "and " if not loop.last }}
{%- endfor %}
)
{% if incremental_predicates is not none -%}
and (
{%- for inc_predicate in incremental_predicates %}
{{ inc_predicate }} {{ "and " if not loop.last }}
{%- endfor %}
)
{%- endif %}
{% if delete_condition is not none -%}
when matched and ({{ delete_condition }})
then delete
{%- endif %}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
when not matched
then insert ({{ dest_cols_csv }})
values ({{ src_cols_csv }});

{%- set src_part -%}
merge into {{ target_relation }} as target using {{ tmp_relation }} as src
{%- endset -%}

{%- set merge_part -%}
on (
{%- for key in unique_key_cols -%}
target.{{ key }} = src.{{ key }}
{{ " and " if not loop.last }}
{%- endfor -%}
{% if incremental_predicates is not none -%}
and (
{%- for inc_predicate in incremental_predicates %}
{{ inc_predicate }} {{ "and " if not loop.last }}
{%- endfor %}
)
{%- endif %}
)
{% if delete_condition is not none -%}
when matched and ({{ delete_condition }})
then delete
{%- endif %}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
when not matched
then insert ({{ dest_cols_csv }})
values ({{ src_cols_csv }})
{%- endset -%}

{%- set merge_full -%}
{{ src_part }}
{{ merge_part }}
{%- endset -%}

{%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}
{%- set src_batch_part -%}
merge into {{ target_relation }} as target
using (select * from {{ tmp_relation }} where {{ batch }}) as src
{%- endset -%}
{%- set merge_batch -%}
{{ src_batch_part }}
{{ merge_part }}
{%- endset -%}
{%- do run_query(merge_batch) -%}
{%- endfor -%}

{%- endif -%}

SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}'
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,42 @@
as
{{ sql }}
{% endmacro %}

{% macro create_table_as_with_partitions(temporary, relation, sql) -%}

{% set partitions_batches = get_partition_batches(sql) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}

{%- do log('CREATE EMPTY TABLE: ' ~ relation) -%}
{%- set create_empty_table_query -%}
{{ create_table_as(temporary, relation, sql) }}
limit 0
{%- endset -%}
{%- do run_query(create_empty_table_query) -%}
{%- set dest_columns = adapter.get_columns_in_relation(relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%}

{%- set insert_batch_partitions -%}
insert into {{ relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
from ({{ sql }})
where {{ batch }}
{%- endset -%}

{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}

select 'SUCCESSFULLY CREATED TABLE {{ relation }}'

{%- endmacro %}

{% macro safe_create_table_as(temporary, relation, sql) -%}
{%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{%- do create_table_as_with_partitions(temporary, relation, sql) -%}
{%- endif -%}
{%- endmacro %}
Loading