Skip to content

Commit

Permalink
Distributed table materialization (#163)
Browse files Browse the repository at this point in the history
* distributed table materialization

* fix rebase

* PR fixes
  • Loading branch information
gladkikhtutu authored Jun 26, 2023
1 parent a5ce195 commit 95a4b21
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ClickHouseCredentials(Credentials):
check_exchange: bool = True
custom_settings: Optional[Dict[str, Any]] = None
use_lw_deletes: bool = False
local_suffix: str = 'local'

@property
def type(self):
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ def get_clickhouse_cluster_name(self):
if conn.credentials.cluster:
return f'"{conn.credentials.cluster}"'

@available.parse(lambda *a, **k: {})
def get_clickhouse_local_suffix(self):
conn = self.connections.get_if_exists()
if conn.credentials.local_suffix:
return f'{conn.credentials.local_suffix}'

@available
def clickhouse_db_engine_clause(self):
conn = self.connections.get_if_exists()
Expand Down
100 changes: 100 additions & 0 deletions dbt/include/clickhouse/macros/materializations/distributed_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{% materialization distributed_table, adapter='clickhouse' %}
{%- set local_suffix = adapter.get_clickhouse_local_suffix() -%}

{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}

{% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %}
{% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %}

{%- set backup_relation = none -%}
{%- set preexisting_backup_relation = none -%}
{%- set preexisting_intermediate_relation = none -%}

{% if existing_relation_local is not none %}
{%- set backup_relation_type = existing_relation_local.type -%}
{%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
{% if not existing_relation.can_exchange %}
{%- set intermediate_relation = make_intermediate_relation(target_relation_local) -%}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%}
{% endif %}
{% endif %}
{% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{{ drop_relation_if_exists(view_relation) }}

{% set grant_config = config.get('grants') %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

{% call statement('main') %}
{{ create_view_as(view_relation, sql) }}
{% endcall %}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if backup_relation is none %}
{% do run_query(create_empty_table_from_relation(target_relation_local, view_relation)) or '' %}
{% do run_query(create_distributed_table(target_relation, target_relation_local)) or '' %}
{% elif existing_relation.can_exchange %}
-- We can do an atomic exchange, so no need for an intermediate
{% call statement('main') -%}
{% do run_query(create_empty_table_from_relation(backup_relation, view_relation)) or '' %}
{%- endcall %}
{% do exchange_tables_atomic(backup_relation, existing_relation) %}
{% else %}
{% do run_query(create_empty_table_from_relation(intermediate_relation, view_relation)) or '' %}
{{ adapter.rename_relation(existing_relation_local, backup_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation_local) }}
{% endif %}
{% do run_query(clickhouse__insert_into(target_relation, sql)) or '' %}
{{ drop_relation_if_exists(view_relation) }}
-- cleanup
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{{ drop_relation_if_exists(backup_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

{% macro create_distributed_table(relation, local_relation) %}
{%- set cluster = adapter.get_clickhouse_cluster_name()[1:-1] -%}
{%- set sharding = config.get('sharding_key') -%}

CREATE TABLE {{ relation }} {{ on_cluster_clause() }} AS {{ local_relation }}
ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}'
{% if sharding is not none %}
, {{ sharding }}
{% endif %}
)
{% endmacro %}

{% macro create_empty_table_from_relation(relation, source_relation) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set columns = adapter.get_columns_in_relation(source_relation) | list -%}

{%- set col_list = [] -%}
{% for col in columns %}
{{col_list.append(col.name + ' ' + col.data_type) or '' }}
{% endfor %}
{{ sql_header if sql_header is not none }}

create table {{ relation.include(database=False) }}
{{ on_cluster_clause() }} (
{{col_list | join(', ')}}
)

{{ engine_clause() }}
{{ order_cols(label="order by") }}
{{ primary_key_clause(label="primary key") }}
{{ partition_cols(label="partition by") }}
{{ adapter.get_model_settings(model) }}
{%- endmacro %}

0 comments on commit 95a4b21

Please sign in to comment.