Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Multipartitioning external tables #7

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion example_project/models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,75 @@ sources:
- name: user
data_type: 'STRING'
- name: version
data_type: 'BIGINT'
data_type: 'BIGINT'

- name: aws_public_blockchain
database: awscatalog
quoting:
database: false
schema: false
identifier: false
describe: "https://github.com/aws-samples/digital-assets-examples/blob/main/analytics/"
external:
location: "s3://aws-public-blockchain/v1.0"
row_format: "serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"
partitions:
- name: coin
data_type: string
path_macro: value_only
vals: [
'btc',
'eth'
]
- name: transactions
data_type: string
path_macro: value_only
vals: [
'/transactions/date=2024-09-19',
'/transactions/date=2024-09-20'
]
columns:
- name: gas
data_type: bigint
- name: hash
data_type: string
- name: input
data_type: string
- name: nonce
data_type: bigint
- name: value
data_type: double
- name: block_number
data_type: bigint
- name: block_hash
data_type: string
- name: transaction_index
data_type: bigint
- name: from_address
data_type: string
- name: to_address
data_type: string
- name: gas_price
data_type: bigint
- name: receipt_cumulative_gas_used
data_type: bigint
- name: receipt_gas_used
data_type: bigint
- name: receipt_contract_address
data_type: string
- name: receipt_status
data_type: bigint
- name: receipt_effective_gas_price
data_type: bigint
- name: transaction_type
data_type: int
- name: max_fee_per_gas
data_type: bigint
- name: max_priority_fee_per_gas
data_type: bigint
- name: block_timestamp
data_type: timestamp
- name: date
data_type: string
- name: last_modified
data_type: timestamp
95 changes: 62 additions & 33 deletions macros/refresh_external_tables.sql
Original file line number Diff line number Diff line change
@@ -1,38 +1,67 @@
{% macro athena__refresh_external_table(source_node) %}
{# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #}
{%- set partitions = source_node.external.partitions -%}
{%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%}
{%- if partitions -%}
{%- if hive_compatible_partitions -%}
{% set ddl -%}
msck repair table {{source(source_node.source_name, source_node.name).render_hive()}}
{%- endset %}
{{ return([ddl]) }}
{% else %}
{# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #}
{%- set part_len = partitions|length -%}
{%- set finals = [] -%}
{%- for partition in partitions -%}
{%- if partition.vals.macro -%}
{%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%}
{%- elif partition.vals is string -%}
{%- set vals = [partition.vals] -%}
{%- else -%}
{%- set vals = partition.vals -%}
{%- endif -%}
{%- for val in vals -%}
{%- set partition_parts = [{'name': partition.name, 'value': val}] -%}
{%- set path_parts = [dbt_external_tables.render_from_context(partition.path_macro, partition.name, val)] -%}
{%- set construct = {
'partition_by': partition_parts,
'path': path_parts | join('/')
} -%}
{% do finals.append(construct) %}
{%- endfor -%}
{# https://docs.aws.amazon.com/athena/latest/ug/partitions.html #}

{%- set athena_partitioning = [{'partition_by': [], 'path': ''}] -%}
{%- set list_partitions = [] -%}
{%- set finals = [] -%}

{%- set partitions = source_node.external.partitions -%}
{%- set hive_compatible_partitions = source_node.external.get('hive_compatible_partitions', false) -%}

{%- if partitions -%}
{%- if hive_compatible_partitions -%}
{% set ddl -%}
msck repair table {{source(source_node.source_name, source_node.name).render_hive()}}
{%- endset %}
{{ return([ddl]) }}
{% else %}
{# https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html #}
{%- set part_len = partitions|length -%}
{%- for partition in partitions -%}
{%- if not loop.first -%}
{%- set athena_partitioning = list_partitions -%}
{%- set list_partitions = [] -%}
{%- endif -%}
{%- for preexisting in athena_partitioning -%}
{%- if partition.vals.macro -%}
{%- set vals = dbt_external_tables.render_from_context(partition.vals.macro, **partition.vals.args) -%}
{%- elif partition.vals is string -%}
{%- set vals = [partition.vals] -%}
{%- else -%}
{%- set vals = partition.vals -%}
{%- endif -%}

{# Allow the use of custom 'key' in path_macro (path.sql) #}
{# By default, take value from source node 'external.partitions.name' from raw yml #}
{# Useful if the data in s3 is saved with a prefix/suffix path 'path_macro_key' other than 'external.partitions.name' #}
{%- if partition.path_macro_key -%}
{%- set path_macro_key = partition.path_macro_key -%}
{%- else -%}
{%- set path_macro_key = partition.name -%}
{%- endif -%}

{%- for val in vals -%}
{# For each preexisting item, add a new one #}
{%- set partition_parts = [] -%}
{%- for prexist_part in preexisting.partition_by -%}
{%- do partition_parts.append(prexist_part) -%}
{%- endfor -%}
{%- do partition_parts.append({'name': partition.name, 'value': val}) -%}
{# Concatenate path #}
{%- set path_parts = preexisting.path ~ render_from_context(partition.path_macro, path_macro_key, val) -%}
{%- do list_partitions.append({'partition_by': partition_parts, 'path': path_parts}) -%}
{%- endfor -%}
{%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%}
{{ return(ddl) }}
{% endif %}
{%- endfor -%}
{%- if loop.last -%}
{%- for construct in list_partitions -%}
{%- do finals.append(construct) -%}
{%- endfor -%}
{%- endif -%}
{%- endfor -%}
{%- set ddl = dbt_external_tables.redshift_alter_table_add_partitions(source_node, finals) -%}
{{ return(ddl) }}
{% endif %}
{% else %}
{% do return([]) %}
{% endif %}
{% endmacro %}