From b1f34c2b69d54c85ef7977022cca2de9c0b1b571 Mon Sep 17 00:00:00 2001 From: gladkikh Date: Mon, 19 Jun 2023 19:36:23 +0300 Subject: [PATCH 1/3] distributed table materialization --- dbt/adapters/clickhouse/credentials.py | 1 + dbt/adapters/clickhouse/impl.py | 8 +- .../materializations/distributed_table.sql | 101 ++++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 dbt/include/clickhouse/macros/materializations/distributed_table.sql diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index 178625f1..4ccf2d63 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -32,6 +32,7 @@ class ClickHouseCredentials(Credentials): check_exchange: bool = True custom_settings: Optional[Dict[str, Any]] = None use_lw_deletes: bool = False + local_suffix: str = 'local' @property def type(self): diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 7bbc2569..f482ddb4 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -74,6 +74,12 @@ def get_clickhouse_cluster_name(self): if conn.credentials.cluster: return f'"{conn.credentials.cluster}"' + @available.parse(lambda *a, **k: {}) + def get_clickhouse_local_suffix(self): + conn = self.connections.get_if_exists() + if conn.credentials.local_suffix: + return f'{conn.credentials.local_suffix}' + @available def clickhouse_db_engine_clause(self): conn = self.connections.get_if_exists() @@ -150,7 +156,7 @@ def s3source_clause( if path: if bucket and path and not bucket.endswith('/') and not bucket.startswith('/'): path = f'/{path}' - url = f'{url}{path}' + url = f'{url}{path}'.replace('//', '/') if not url.startswith('http'): url = f'https://{url}' access = '' diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql new file mode 100644 index 00000000..87a3a7fd --- /dev/null +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -0,0 +1,101 @@ +{% materialization distributed_table, adapter='clickhouse' %} + {%- set local_suffix = adapter.get_clickhouse_local_suffix() -%} + + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='table') -%} + + {% set existing_relation_local = existing_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if existing_relation is not none else none %} + {% set target_relation_local = target_relation.incorporate(path={"identifier": model['name'] + local_suffix}) if target_relation is not none else none %} + + {%- set backup_relation = none -%} + {%- set preexisting_backup_relation = none -%} + {%- set preexisting_intermediate_relation = none -%} + + {% if existing_relation_local is not none %} + {%- set backup_relation_type = existing_relation_local.type -%} + {%- set backup_relation = make_backup_relation(target_relation_local, backup_relation_type) -%} + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + {% if not existing_relation.can_exchange %} + {%- set intermediate_relation = make_intermediate_relation(target_relation_local) -%} + {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} + {% endif %} + {% endif %} + {% set view_relation = default__make_temp_relation(target_relation, '__dbt_tmp') %} + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + {{ drop_relation_if_exists(view_relation) }} + + {% set grant_config = config.get('grants') %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + {% call statement('main') %} + {{ create_view_as(view_relation, sql) }} + {% endcall %} + + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% if backup_relation is none %} + {% do run_query(create_empty_table_from_view(target_relation_local, view_relation)) or '' %} + {% do run_query(create_distributed_table(target_relation, target_relation_local)) or '' %} + {% elif existing_relation.can_exchange %} + -- We can do an atomic exchange, so no need for an intermediate + {% call statement('main') -%} + {% do run_query(create_empty_table_from_view(backup_relation, view_relation)) or '' %} + {%- endcall %} + {% do exchange_tables_atomic(backup_relation, existing_relation) %} + {% else %} + {% do run_query(create_empty_table_from_view(intermediate_relation, view_relation)) or '' %} + {{ adapter.rename_relation(existing_relation_local, backup_relation) }} + {{ adapter.rename_relation(intermediate_relation, target_relation_local) }} + {% endif %} + {% do run_query(clickhouse__insert_into(target_relation, sql)) or '' %} + {{ drop_relation_if_exists(view_relation) }} + -- cleanup + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + {{ adapter.commit() }} + {{ drop_relation_if_exists(backup_relation) }} + {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} + +{% macro create_distributed_table(relation, local_relation) %} + {%- set cluster = adapter.get_clickhouse_cluster_name()[1:-1] -%} + {%- set sharding = config.get('sharding_key') -%} + + CREATE TABLE {{ relation }} {{ on_cluster_clause() }} + ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}' + {% if sharding is none %} + , {{ sharding }} + {% endif %} + ) + AS SELECT * FROM {{ local_relation}} LIMIT 0 + {% endmacro %} + +{% macro create_empty_table_from_view(relation, view_relation) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {%- set columns = adapter.get_columns_in_relation(view_relation) | list -%} + + {%- set col_list = [] -%} + {% for col in columns %} + {{col_list.append(col.name + ' ' + col.data_type) or '' }} + {% endfor %} + {{ sql_header if sql_header is not none }} + + create table {{ relation.include(database=False) }} + {{ on_cluster_clause() }} ( + {{col_list | join(', ')}} + ) + + {{ engine_clause() }} + {{ order_cols(label="order by") }} + {{ primary_key_clause(label="primary key") }} + {{ partition_cols(label="partition by") }} + {{ adapter.get_model_settings(model) }} +{%- endmacro %} From 45792140aea095e8d401412a4b84db8498430996 Mon Sep 17 00:00:00 2001 From: gladkikh Date: Mon, 19 Jun 2023 19:39:16 +0300 Subject: [PATCH 2/3] fix rebase --- dbt/adapters/clickhouse/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index f482ddb4..83a32d39 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -156,7 +156,7 @@ def s3source_clause( if path: if bucket and path and not bucket.endswith('/') and not bucket.startswith('/'): path = f'/{path}' - url = f'{url}{path}'.replace('//', '/') + url = f'{url}{path}' if not url.startswith('http'): url = f'https://{url}' access = '' From c280bfe5f2c60ad6a9f97fc16766d7dac0b87b20 Mon Sep 17 00:00:00 2001 From: gladkikh Date: Wed, 21 Jun 2023 13:21:30 +0300 Subject: [PATCH 3/3] PR fixes --- .../macros/materializations/distributed_table.sql | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index 87a3a7fd..44eeb740 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -37,16 +37,16 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% if backup_relation is none %} - {% do run_query(create_empty_table_from_view(target_relation_local, view_relation)) or '' %} + {% do run_query(create_empty_table_from_relation(target_relation_local, view_relation)) or '' %} {% do run_query(create_distributed_table(target_relation, target_relation_local)) or '' %} {% elif existing_relation.can_exchange %} -- We can do an atomic exchange, so no need for an intermediate {% call statement('main') -%} - {% do run_query(create_empty_table_from_view(backup_relation, view_relation)) or '' %} + {% do run_query(create_empty_table_from_relation(backup_relation, view_relation)) or '' %} {%- endcall %} {% do exchange_tables_atomic(backup_relation, existing_relation) %} {% else %} - {% do run_query(create_empty_table_from_view(intermediate_relation, view_relation)) or '' %} + {% do run_query(create_empty_table_from_relation(intermediate_relation, view_relation)) or '' %} {{ adapter.rename_relation(existing_relation_local, backup_relation) }} {{ adapter.rename_relation(intermediate_relation, target_relation_local) }} {% endif %} @@ -69,18 +69,17 @@ {%- set cluster = adapter.get_clickhouse_cluster_name()[1:-1] -%} {%- set sharding = config.get('sharding_key') -%} - CREATE TABLE {{ relation }} {{ on_cluster_clause() }} + CREATE TABLE {{ relation }} {{ on_cluster_clause() }} AS {{ local_relation }} ENGINE = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}' - {% if sharding is none %} + {% if sharding is not none %} , {{ sharding }} {% endif %} ) - AS SELECT * FROM {{ local_relation}} LIMIT 0 {% endmacro %} -{% macro create_empty_table_from_view(relation, view_relation) -%} +{% macro create_empty_table_from_relation(relation, source_relation) -%} {%- set sql_header = config.get('sql_header', none) -%} - {%- set columns = adapter.get_columns_in_relation(view_relation) | list -%} + {%- set columns = adapter.get_columns_in_relation(source_relation) | list -%} {%- set col_list = [] -%} {% for col in columns %}