Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make mv data catchup configurable #399

Merged
merged 6 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### Release [1.8.6], 2024-12-05

### Improvement
* Today, on mv model creation, the target table is being populated with the historical data based on the query provided in the mv creation. This catchup mechanism is now behind a config flag and enabled by default (as is today). ([#399](https://github.com/ClickHouse/dbt-clickhouse/pull/399))

### Release [1.8.5], 2024-11-19

### New Features
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,22 @@ select a,b,c from {{ source('raw', 'table_2') }}
>
> When updating a model with multiple materialized views (MVs), especially when renaming one of the MV names, dbt-clickhouse does not automatically drop the old MV. Instead,
> you will encounter the following warning: `Warning - Table <previous table name> was detected with the same pattern as model name <your model name> but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!) `


## Data catchup
Currently, when creating a materialized view (MV), the target table is first populated with historical data before the MV itself is created.

In other words, dbt-clickhouse initially creates the target table and preloads it with historical data based on the query defined for the MV. Only after this step is the MV created.

If you prefer not to preload historical data during MV creation, you can disable this behavior by setting the catchup config to False:

```python
{{config(
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
catchup=False
)}}
```


# Dictionary materializations (experimental)
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.8.5'
version = '1.8.6'
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@

{% if backup_relation is none %}
{{ log('Creating new materialized view ' + target_relation.name )}}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views) }}
{% set catchup_data = config.get("catchup", True) %}
{{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views, catchup_data) }}
{% elif existing_relation.can_exchange %}
{{ log('Replacing existing materialized view ' + target_relation.name) }}
-- in this section, we look for mvs that has the same pattern as this model, but for some reason,
Expand Down Expand Up @@ -132,9 +133,15 @@
2. Create a materialized view using the SQL in the model that inserts
data into the table creating during step 1
#}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views) -%}
{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views, catchup=True ) -%}
{% call statement('main') %}
{% if catchup == True %}
{{ get_create_table_as_sql(False, relation, sql) }}
{% else %}
{{ log('Catchup data config was set to false, skipping mv-target-table initial insertion ')}}
{% set has_contract = config.get('contract').enforced %}
{{ create_table_or_empty(False, relation, sql, has_contract) }}
{% endif %}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%}
Expand Down
213 changes: 22 additions & 191 deletions tests/integration/adapter/materialized_view/test_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
schema='custom_schema',
schema='catchup' if var('run_type', '') == 'catchup' else 'custom_schema',
**({'catchup': False} if var('run_type', '') == 'catchup' else {})
) }}

{% if var('run_type', '') == '' %}
{% if var('run_type', '') in ['', 'catchup'] %}
select
id,
name,
Expand Down Expand Up @@ -60,74 +61,6 @@
{% endif %}
"""

MULTIPLE_MV_MODEL = """
{{ config(
materialized='materialized_view',
engine='MergeTree()',
order_by='(id)',
schema='custom_schema_for_multiple_mv',
) }}

{% if var('run_type', '') == '' %}

--mv1:begin
select
id,
name,
case
when name like 'Dade' then 'crash_override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
--mv1:end

union all

--mv2:begin
select
id,
name,
-- sales people are not cool enough to have a hacker alias
'N/A' as hacker_alias
from {{ source('raw', 'people') }}
where department = 'sales'
--mv2:end

{% elif var('run_type', '') == 'extended_schema' %}

--mv1:begin
select
id,
name,
case
-- Dade wasn't always known as 'crash override'!
when name like 'Dade' and age = 11 then 'zero cool'
when name like 'Dade' and age != 11 then 'crash override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
--mv1:end

union all

--mv2:begin
select
id,
name,
-- sales people are not cool enough to have a hacker alias
'N/A' as hacker_alias
from {{ source('raw', 'people') }}
where department = 'sales'
--mv2:end

{% endif %}
"""


SEED_SCHEMA_YML = """
version: 2

Expand Down Expand Up @@ -197,116 +130,30 @@ def test_create(self, project):
result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all")
assert result[0][0] == 4


class TestUpdateMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MV_MODEL,
}

def test_update_incremental(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()

# re-run dbt but this time with the new MV SQL
run_vars = {"run_type": "extended_schema"}
run_dbt(["run", "--vars", json.dumps(run_vars)])

project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware');
"""
)

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2

def test_update_full_refresh(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()

# re-run dbt but this time with the new MV SQL
run_vars = {"run_type": "extended_schema"}
run_dbt(["run", "--full-refresh", "--vars", json.dumps(run_vars)])

project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware');
"""
)

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2


class TestMultipleMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
we need a base table to pull from
"""
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MULTIPLE_MV_MODEL,
}

def test_create(self, project):
def test_disabled_catchup(self, project):
"""
1. create a base table via dbt seed
2. create a model as a materialized view, selecting from the table created in (1)
2. create a model with catchup disabled as a materialized view, selecting from the table created in (1)
3. insert data into the base table and make sure it's there in the target table created in (2)
"""
schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv")
schema = quote_identifier(project.test_schema + "_catchup")
results = run_dbt(["seed"])
assert len(results) == 1
columns = project.run_sql("DESCRIBE TABLE people", fetch="all")
assert columns[0][1] == "Int32"

# create the model
run_dbt(["run"])
# create the model with catchup disabled
run_vars = {"run_type": "catchup"}
run_dbt(["run", "--vars", json.dumps(run_vars)])
# check that we only have the new row, without the historical data
assert len(results) == 1

columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all")
assert columns[0][1] == "Int32"

columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all")
columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all")
assert columns[0][1] == "Int32"

with pytest.raises(Exception):
columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all")

check_relation_types(
project.adapter,
{
Expand All @@ -318,25 +165,16 @@ def test_create(self, project):
# insert some data and make sure it reaches the target table
project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering');
"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware');
"""
)

result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all")
assert result == [
(1000, 'Alfie', 'N/A'),
(1231, 'Dade', 'crash_override'),
(2000, 'Bill', 'N/A'),
(3000, 'Charlie', 'N/A'),
(4000, 'Dave', 'N/A'),
(6666, 'Ksenia', 'N/A'),
(8888, 'Kate', 'acid burn'),
(9999, 'Eugene', 'N/A'),
]
result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all")
assert result[0][0] == 1


class TestUpdateMultipleMV:
class TestUpdateMV:
@pytest.fixture(scope="class")
def seeds(self):
"""
Expand All @@ -350,11 +188,11 @@ def seeds(self):
@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MULTIPLE_MV_MODEL,
"hackers.sql": MV_MODEL,
}

def test_update_incremental(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv")
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()
Expand All @@ -372,15 +210,12 @@ def test_update_incremental(self, project):

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias",
fetch="all",
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
assert len(result) == 2
assert result[0][0] == "crash_override"
assert result[1][0] == "zero cool"

def test_update_full_refresh(self, project):
schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv")
schema = quote_identifier(project.test_schema + "_custom_schema")
# create our initial materialized view
run_dbt(["seed"])
run_dbt()
Expand All @@ -398,10 +233,6 @@ def test_update_full_refresh(self, project):

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias",
fetch="all",
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
)
print(result)
assert len(result) == 2
assert result[0][0] == "crash override"
assert result[1][0] == "zero cool"
Loading
Loading