Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed table materialization #163

Merged
merged 3 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ClickHouseCredentials(Credentials):
check_exchange: bool = True
custom_settings: Optional[Dict[str, Any]] = None
use_lw_deletes: bool = False
local_suffix: str = 'local'

@property
def type(self):
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ def get_clickhouse_cluster_name(self):
if conn.credentials.cluster:
return f'"{conn.credentials.cluster}"'

@available.parse(lambda *a, **k: {})
def get_clickhouse_local_suffix(self):
conn = self.connections.get_if_exists()
if conn.credentials.local_suffix:
return f'{conn.credentials.local_suffix}'

@available
def clickhouse_db_engine_clause(self):
conn = self.connections.get_if_exists()
Expand Down
101 changes: 101 additions & 0 deletions dbt/include/clickhouse/macros/materializations/distributed_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{% materialization distributed_table, adapter='clickhouse' %}
{%- set local_suffix = adapter.get_clickhouse_local_suffix() -%}

{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}

{% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %}

{%- set backup_relation = none -%}
{%- set preexisting_backup_relation = none -%}
{%- set preexisting_intermediate_relation = none -%}

{% if existing_relation_local is not none %}
{%- set backup_relation_type = existing_relation_local.type -%}
{%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{% if not existing_relation.can_exchange %}
{%- set intermediate_relation = make_intermediate_relation(target_relation_local) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
{% endif %}
{% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{{ drop_relation_if_exists(view_relation) }}

{% set grant_config = config.get('grants') %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

{% call statement('main') %}
{{ create_view_as(view_relation, sql) }}
{% endcall %}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if backup_relation is none %}
{% do run_query(create_empty_table_from_view(target_relation_local, view_relation)) or '' %}
{% do run_query(create_distributed_table(target_relation, target_relation_local)) or '' %}
{% elif existing_relation.can_exchange %}
-- We can do an atomic exchange, so no need for an intermediate
{% call statement('main') -%}
{% do run_query(create_empty_table_from_view(backup_relation, view_relation)) or '' %}
{%- endcall %}
{% do exchange_tables_atomic(backup_relation, existing_relation) %}
{% else %}
{% do run_query(create_empty_table_from_view(intermediate_relation, view_relation)) or '' %}
{{ adapter.rename_relation(existing_relation_local, backup_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation_local) }}
{% endif %}
{% do run_query(clickhouse__insert_into(target_relation, sql)) or '' %}
{{ drop_relation_if_exists(view_relation) }}
-- cleanup
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{{ drop_relation_if_exists(backup_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

{% macro create_distributed_table(relation, local_relation) %}
{%- set cluster = adapter.get_clickhouse_cluster_name()[1:-1] -%}
{%- set sharding = config.get('sharding_key') -%}

CREATE TABLE {{ relation }} {{ on_cluster_clause() }}
gladkikhtutu marked this conversation as resolved.
Show resolved Hide resolved
ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}'
{% if sharding is none %}
gladkikhtutu marked this conversation as resolved.
Show resolved Hide resolved
, {{ sharding }}
{% endif %}
)
AS SELECT * FROM {{ local_relation}} LIMIT 0
{% endmacro %}

{% macro create_empty_table_from_view(relation, view_relation) -%}
gladkikhtutu marked this conversation as resolved.
Show resolved Hide resolved
{%- set sql_header = config.get('sql_header', none) -%}
{%- set columns = adapter.get_columns_in_relation(view_relation) | list -%}

{%- set col_list = [] -%}
{% for col in columns %}
{{col_list.append(col.name + ' ' + col.data_type) or '' }}
{% endfor %}
{{ sql_header if sql_header is not none }}

create table {{ relation.include(database=False) }}
{{ on_cluster_clause() }} (
{{col_list | join(', ')}}
)

{{ engine_clause() }}
{{ order_cols(label="order by") }}
{{ primary_key_clause(label="primary key") }}
{{ partition_cols(label="partition by") }}
{{ adapter.get_model_settings(model) }}
{%- endmacro %}