diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e23f02c..b99ab58e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +### Release [1.8.6], 2024-12-05 + +### Improvement +* Today, on mv model creation, the target table is being populated with the historical data based on the query provided in the mv creation. This catchup mechanism is now behind a config flag and enabled by default (as is today). ([#399](https://github.com/ClickHouse/dbt-clickhouse/pull/399)) + ### Release [1.8.5], 2024-11-19 ### New Features diff --git a/README.md b/README.md index 7855fbb2..eeb8d11e 100644 --- a/README.md +++ b/README.md @@ -264,7 +264,22 @@ select a,b,c from {{ source('raw', 'table_2') }} > > When updating a model with multiple materialized views (MVs), especially when renaming one of the MV names, dbt-clickhouse does not automatically drop the old MV. Instead, > you will encounter the following warning: `Warning - Table was detected with the same pattern as model name but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!) ` - + +## Data catchup +Currently, when creating a materialized view (MV), the target table is first populated with historical data before the MV itself is created. + +In other words, dbt-clickhouse initially creates the target table and preloads it with historical data based on the query defined for the MV. Only after this step is the MV created. + +If you prefer not to preload historical data during MV creation, you can disable this behavior by setting the catchup config to False: + +```python +{{config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(id)', + catchup=False +)}} +``` # Dictionary materializations (experimental) diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index 61aaff6b..f22f38bf 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.8.5' +version = '1.8.6' diff --git a/dbt/include/clickhouse/macros/materializations/materialized_view.sql b/dbt/include/clickhouse/macros/materializations/materialized_view.sql index 4583918b..8cb6ea04 100644 --- a/dbt/include/clickhouse/macros/materializations/materialized_view.sql +++ b/dbt/include/clickhouse/macros/materializations/materialized_view.sql @@ -50,7 +50,8 @@ {% if backup_relation is none %} {{ log('Creating new materialized view ' + target_relation.name )}} - {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views) }} + {% set catchup_data = config.get("catchup", True) %} + {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views, catchup_data) }} {% elif existing_relation.can_exchange %} {{ log('Replacing existing materialized view ' + target_relation.name) }} -- in this section, we look for mvs that has the same pattern as this model, but for some reason, @@ -132,9 +133,15 @@ 2. Create a materialized view using the SQL in the model that inserts data into the table creating during step 1 #} -{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views) -%} +{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views, catchup=True ) -%} {% call statement('main') %} + {% if catchup == True %} {{ get_create_table_as_sql(False, relation, sql) }} + {% else %} + {{ log('Catchup data config was set to false, skipping mv-target-table initial insertion ')}} + {% set has_contract = config.get('contract').enforced %} + {{ create_table_or_empty(False, relation, sql, has_contract) }} + {% endif %} {% endcall %} {%- set cluster_clause = on_cluster_clause(relation) -%} {%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%} diff --git a/tests/integration/adapter/materialized_view/test_materialized_view.py b/tests/integration/adapter/materialized_view/test_materialized_view.py index 9c9ffdb6..b8cb8214 100644 --- a/tests/integration/adapter/materialized_view/test_materialized_view.py +++ b/tests/integration/adapter/materialized_view/test_materialized_view.py @@ -28,10 +28,11 @@ materialized='materialized_view', engine='MergeTree()', order_by='(id)', - schema='custom_schema', + schema='catchup' if var('run_type', '') == 'catchup' else 'custom_schema', + **({'catchup': False} if var('run_type', '') == 'catchup' else {}) ) }} -{% if var('run_type', '') == '' %} +{% if var('run_type', '') in ['', 'catchup'] %} select id, name, @@ -60,74 +61,6 @@ {% endif %} """ -MULTIPLE_MV_MODEL = """ -{{ config( - materialized='materialized_view', - engine='MergeTree()', - order_by='(id)', - schema='custom_schema_for_multiple_mv', -) }} - -{% if var('run_type', '') == '' %} - ---mv1:begin -select - id, - name, - case - when name like 'Dade' then 'crash_override' - when name like 'Kate' then 'acid burn' - else 'N/A' - end as hacker_alias -from {{ source('raw', 'people') }} -where department = 'engineering' ---mv1:end - -union all - ---mv2:begin -select - id, - name, - -- sales people are not cool enough to have a hacker alias - 'N/A' as hacker_alias -from {{ source('raw', 'people') }} -where department = 'sales' ---mv2:end - -{% elif var('run_type', '') == 'extended_schema' %} - ---mv1:begin -select - id, - name, - case - -- Dade wasn't always known as 'crash override'! - when name like 'Dade' and age = 11 then 'zero cool' - when name like 'Dade' and age != 11 then 'crash override' - when name like 'Kate' then 'acid burn' - else 'N/A' - end as hacker_alias -from {{ source('raw', 'people') }} -where department = 'engineering' ---mv1:end - -union all - ---mv2:begin -select - id, - name, - -- sales people are not cool enough to have a hacker alias - 'N/A' as hacker_alias -from {{ source('raw', 'people') }} -where department = 'sales' ---mv2:end - -{% endif %} -""" - - SEED_SCHEMA_YML = """ version: 2 @@ -197,116 +130,30 @@ def test_create(self, project): result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all") assert result[0][0] == 4 - -class TestUpdateMV: - @pytest.fixture(scope="class") - def seeds(self): - """ - we need a base table to pull from - """ - return { - "people.csv": PEOPLE_SEED_CSV, - "schema.yml": SEED_SCHEMA_YML, - } - - @pytest.fixture(scope="class") - def models(self): - return { - "hackers.sql": MV_MODEL, - } - - def test_update_incremental(self, project): - schema = quote_identifier(project.test_schema + "_custom_schema") - # create our initial materialized view - run_dbt(["seed"]) - run_dbt() - - # re-run dbt but this time with the new MV SQL - run_vars = {"run_type": "extended_schema"} - run_dbt(["run", "--vars", json.dumps(run_vars)]) - - project.run_sql( - f""" - insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") - values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); - """ - ) - - # assert that we now have both of Dade's aliases in our hackers table - result = project.run_sql( - f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all" - ) - assert len(result) == 2 - - def test_update_full_refresh(self, project): - schema = quote_identifier(project.test_schema + "_custom_schema") - # create our initial materialized view - run_dbt(["seed"]) - run_dbt() - - # re-run dbt but this time with the new MV SQL - run_vars = {"run_type": "extended_schema"} - run_dbt(["run", "--full-refresh", "--vars", json.dumps(run_vars)]) - - project.run_sql( - f""" - insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") - values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); - """ - ) - - # assert that we now have both of Dade's aliases in our hackers table - result = project.run_sql( - f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all" - ) - assert len(result) == 2 - - -class TestMultipleMV: - @pytest.fixture(scope="class") - def seeds(self): - """ - we need a base table to pull from - """ - return { - "people.csv": PEOPLE_SEED_CSV, - "schema.yml": SEED_SCHEMA_YML, - } - - @pytest.fixture(scope="class") - def models(self): - return { - "hackers.sql": MULTIPLE_MV_MODEL, - } - - def test_create(self, project): + def test_disabled_catchup(self, project): """ 1. create a base table via dbt seed - 2. create a model as a materialized view, selecting from the table created in (1) + 2. create a model with catchup disabled as a materialized view, selecting from the table created in (1) 3. insert data into the base table and make sure it's there in the target table created in (2) """ - schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + schema = quote_identifier(project.test_schema + "_catchup") results = run_dbt(["seed"]) assert len(results) == 1 columns = project.run_sql("DESCRIBE TABLE people", fetch="all") assert columns[0][1] == "Int32" - # create the model - run_dbt(["run"]) + # create the model with catchup disabled + run_vars = {"run_type": "catchup"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + # check that we only have the new row, without the historical data assert len(results) == 1 columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all") assert columns[0][1] == "Int32" - columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all") - assert columns[0][1] == "Int32" - - columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all") + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") assert columns[0][1] == "Int32" - with pytest.raises(Exception): - columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") - check_relation_types( project.adapter, { @@ -318,25 +165,16 @@ def test_create(self, project): # insert some data and make sure it reaches the target table project.run_sql( f""" - insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") - values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering'); - """ + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware'); + """ ) - result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all") - assert result == [ - (1000, 'Alfie', 'N/A'), - (1231, 'Dade', 'crash_override'), - (2000, 'Bill', 'N/A'), - (3000, 'Charlie', 'N/A'), - (4000, 'Dave', 'N/A'), - (6666, 'Ksenia', 'N/A'), - (8888, 'Kate', 'acid burn'), - (9999, 'Eugene', 'N/A'), - ] + result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all") + assert result[0][0] == 1 -class TestUpdateMultipleMV: +class TestUpdateMV: @pytest.fixture(scope="class") def seeds(self): """ @@ -350,11 +188,11 @@ def seeds(self): @pytest.fixture(scope="class") def models(self): return { - "hackers.sql": MULTIPLE_MV_MODEL, + "hackers.sql": MV_MODEL, } def test_update_incremental(self, project): - schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + schema = quote_identifier(project.test_schema + "_custom_schema") # create our initial materialized view run_dbt(["seed"]) run_dbt() @@ -372,15 +210,12 @@ def test_update_incremental(self, project): # assert that we now have both of Dade's aliases in our hackers table result = project.run_sql( - f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", - fetch="all", + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all" ) assert len(result) == 2 - assert result[0][0] == "crash_override" - assert result[1][0] == "zero cool" def test_update_full_refresh(self, project): - schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + schema = quote_identifier(project.test_schema + "_custom_schema") # create our initial materialized view run_dbt(["seed"]) run_dbt() @@ -398,10 +233,6 @@ def test_update_full_refresh(self, project): # assert that we now have both of Dade's aliases in our hackers table result = project.run_sql( - f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", - fetch="all", + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all" ) - print(result) assert len(result) == 2 - assert result[0][0] == "crash override" - assert result[1][0] == "zero cool" diff --git a/tests/integration/adapter/materialized_view/test_multiple_materialized_views.py b/tests/integration/adapter/materialized_view/test_multiple_materialized_views.py new file mode 100644 index 00000000..9a2d9850 --- /dev/null +++ b/tests/integration/adapter/materialized_view/test_multiple_materialized_views.py @@ -0,0 +1,248 @@ +""" +test materialized view creation. This is ClickHouse specific, which has a significantly different implementation +of materialized views from PostgreSQL or Oracle +""" + +import json + +import pytest +from dbt.tests.util import check_relation_types, run_dbt + +from dbt.adapters.clickhouse.query import quote_identifier + +PEOPLE_SEED_CSV = """ +id,name,age,department +1231,Dade,33,engineering +6666,Ksenia,48,engineering +8888,Kate,50,engineering +1000,Alfie,10,sales +2000,Bill,20,sales +3000,Charlie,30,sales +""".lstrip() + +# This model is parameterized, in a way, by the "run_type" dbt project variable +# This is to be able to switch between different model definitions within +# the same test run and allow us to test the evolution of a materialized view + +MULTIPLE_MV_MODEL = """ +{{ config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(id)', + schema='custom_schema_for_multiple_mv', +) }} + +{% if var('run_type', '') == '' %} + +--mv1:begin +select + id, + name, + case + when name like 'Dade' then 'crash_override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + +{% elif var('run_type', '') == 'extended_schema' %} + +--mv1:begin +select + id, + name, + case + -- Dade wasn't always known as 'crash override'! + when name like 'Dade' and age = 11 then 'zero cool' + when name like 'Dade' and age != 11 then 'crash override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + +{% endif %} +""" + + +SEED_SCHEMA_YML = """ +version: 2 + +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: people +""" + + +class TestMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MULTIPLE_MV_MODEL, + } + + def test_create(self, project): + """ + 1. create a base table via dbt seed + 2. create a model as a materialized view, selecting from the table created in (1) + 3. insert data into the base table and make sure it's there in the target table created in (2) + """ + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql("DESCRIBE TABLE people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model + run_dbt(["run"]) + assert len(results) == 1 + + columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all") + assert columns[0][1] == "Int32" + + with pytest.raises(Exception): + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + # insert some data and make sure it reaches the target table + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering'); + """ + ) + + result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all") + assert result == [ + (1000, 'Alfie', 'N/A'), + (1231, 'Dade', 'crash_override'), + (2000, 'Bill', 'N/A'), + (3000, 'Charlie', 'N/A'), + (4000, 'Dave', 'N/A'), + (6666, 'Ksenia', 'N/A'), + (8888, 'Kate', 'acid burn'), + (9999, 'Eugene', 'N/A'), + ] + + +class TestUpdateMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MULTIPLE_MV_MODEL, + } + + def test_update_incremental(self, project): + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", + fetch="all", + ) + assert len(result) == 2 + assert result[0][0] == "crash_override" + assert result[1][0] == "zero cool" + + def test_update_full_refresh(self, project): + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--full-refresh", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", + fetch="all", + ) + print(result) + assert len(result) == 2 + assert result[0][0] == "crash override" + assert result[1][0] == "zero cool"