diff --git a/dbt/include/clickhouse/macros/materializations/materialized_view.sql b/dbt/include/clickhouse/macros/materializations/materialized_view.sql new file mode 100644 index 00000000..f3c66cfd --- /dev/null +++ b/dbt/include/clickhouse/macros/materializations/materialized_view.sql @@ -0,0 +1,120 @@ +{#- + Create or update a materialized view in ClickHouse. + This involves creating both the materialized view itself and a + target table that the materialized view writes to. +-#} +{%- materialization materialized_view, adapter='clickhouse' -%} + + {%- set target_relation = this.incorporate(type='table') -%} + {%- set mv_name = target_relation.name + '_mv' -%} + {%- set target_mv = api.Relation.create(identifier=mv_name, schema=schema, database=database, type='materializedview') -%} + {%- set cluster_clause = on_cluster_clause(target_relation) -%} + + {# look for an existing relation for the target table and create backup relations if necessary #} + {%- set existing_relation = load_cached_relation(this) -%} + {%- set backup_relation = none -%} + {%- set preexisting_backup_relation = none -%} + {%- set preexisting_intermediate_relation = none -%} + {% if existing_relation is not none %} + {%- set backup_relation_type = existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + {% if not existing_relation.can_exchange %} + {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} + {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} + {% endif %} + {% endif %} + + {% set grant_config = config.get('grants') %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% if backup_relation is none %} + {{ log('Creating new materialized view ' + target_relation.name )}} + {% call statement('main') -%} + {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql) }} + {%- endcall %} + {% elif existing_relation.can_exchange %} + {{ log('Replacing existing materialized view' + target_relation.name) }} + {% call statement('drop existing materialized view') %} + drop view if exists {{ mv_name }} {{ cluster_clause }} + {% endcall %} + {% call statement('main') -%} + {{ get_create_table_as_sql(False, backup_relation, sql) }} + {%- endcall %} + {% do exchange_tables_atomic(backup_relation, existing_relation) %} + {% call statement('create new materialized view') %} + {{ clickhouse__create_mv_sql(mv_name, existing_relation.name, cluster_clause, sql) }} + {% endcall %} + {% else %} + {{ log('Replacing existing materialized view' + target_relation.name) }} + {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) }} + {% endif %} + + -- cleanup + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {{ adapter.commit() }} + + {{ drop_relation_if_exists(backup_relation) }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation, target_mv]}) }} + +{%- endmaterialization -%} + + +{# + There are two steps to creating a materialized view: + 1. Create a new table based on the SQL in the model + 2. Create a materialized view using the SQL in the model that inserts + data into the table creating during step 1 +#} +{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql) -%} + {% call statement('create_target_table') %} + {{ get_create_table_as_sql(False, relation, sql) }} + {% endcall %} + {%- set cluster_clause = on_cluster_clause(relation) -%} + {%- set mv_name = relation.name + '_mv' -%} + {{ clickhouse__create_mv_sql(mv_name, relation.name, cluster_clause, sql) }} +{%- endmacro %} + + +{% macro clickhouse__create_mv_sql(relation_name, target_table, cluster_clause, sql) -%} + create materialized view if not exists {{ relation_name }} {{ cluster_clause }} + to {{ target_table }} + as {{ sql }} +{%- endmacro %} + + +{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %} + {# drop existing materialized view while we recreate the target table #} + {%- set cluster_clause = on_cluster_clause(target_relation) -%} + {%- set mv_name = target_relation.name + '_mv' -%} + {% call statement('drop existing mv') -%} + drop view if exists {{ mv_name }} {{ cluster_clause }} + {%- endcall %} + + {# recreate the target table #} + {% call statement('main') -%} + {{ get_create_table_as_sql(False, intermediate_relation, sql) }} + {%- endcall %} + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {{ adapter.rename_relation(intermediate_relation, target_relation) }} + + {# now that the target table is recreated, we can finally create our new view #} + {{ clickhouse__create_mv_sql(mv_name, target_relation.name, cluster_clause, sql) }} +{% endmacro %} diff --git a/tests/integration/adapter/test_materialized_view.py b/tests/integration/adapter/test_materialized_view.py new file mode 100644 index 00000000..23452c40 --- /dev/null +++ b/tests/integration/adapter/test_materialized_view.py @@ -0,0 +1,164 @@ +""" +test materialized view creation +""" + +import json + +import pytest +from dbt.tests.util import check_relation_types, run_dbt + +PEOPLE_SEED_CSV = """ +id,name,age,department +1231,Dade,33,engineering +6666,Ksenia,48,engineering +8888,Kate,50,engineering +""".lstrip() + +# This model is parameterized, in a way, by the "run_type" dbt project variable +# This is to be able to switch between different model definitions within +# the same test run and allow us to test the evolution of a materialized view +MV_MODEL = """ +{{ config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(id)', +) }} + +{% if var('run_type', '') == '' %} +select + id, + name, + case + when name like 'Dade' then 'crash_override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' + +{% else %} + +select + id, + name, + case + -- Dade wasn't always known as 'crash override'! + when name like 'Dade' and age = 11 then 'zero cool' + when name like 'Dade' and age != 11 then 'crash override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' + +{% endif %} +""" + + +SEED_SCHEMA_YML = """ +version: 2 + +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: people +""" + + +class TestBasicMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MV_MODEL, + } + + def test_create(self, project): + """ + 1. create a base table via dbt seed + 2. create a model as a materialized view, selecting from the table created in (1) + 3. insert data into the base table and make sure it's there in the target table created in (2) + """ + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql("DESCRIBE TABLE people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model + results = run_dbt() + assert len(results) == 1 + + columns = project.run_sql("DESCRIBE TABLE hackers", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql("DESCRIBE hackers_mv", fetch="all") + assert columns[0][1] == "Int32" + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + # insert some data and make sure it reaches the target table + project.run_sql( + f""" + insert into {project.test_schema}.people ("id", "name", "age", "department") + values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + result = project.run_sql("select count(*) from hackers", fetch="all") + assert result[0][0] == 4 + + +class TestUpdateMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MV_MODEL, + } + + def test_update(self, project): + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {project.test_schema}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + "select distinct hacker_alias from hackers where name = 'Dade'", fetch="all" + ) + assert len(result) == 2