Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1 6 2 #219

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
### Release [1.6.2], 2023-12-06
#### Bug Fix
- The dbt `on_schema_change` configuration value for incremental models was effectively being ignored. This has been fixed
with a very limited implementation. Closes https://github.com/ClickHouse/dbt-clickhouse/issues/199. Because of the way that
ORDER BY/SORT BY/PARTITION BY/PRIMARY KEYS work in ClickHouse, plus the complexities of correctly transforming ClickHouse data types,
`sync_all_columns` is not currently supported (although an implementation that works for non-key columns is theoretically possible,
such an enhancement is not currently planned). Accordingly, only `ignore`, `fail`, and `append_new_columns` values are supported
for `on_schema_change`. It is also not currently supported for Distributed tables.

Note that actually appending new columns requires a fallback to the `legacy` incremental strategy, which is quite inefficient,
so while theoretically possible, using `append_new_columns` is not recommended except for very small data volumes.

### Release [1.6.1], 2023-12-04
#### Bug Fixes
- Identifier quoting was disabled for tables/databases etc. This would cause failures for schemas or tables using reserved words
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.6.1'
version = '1.6.2'
24 changes: 24 additions & 0 deletions dbt/adapters/clickhouse/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
schema_change_fail_error = """
The source and target schemas on this incremental model are out of sync.
They can be reconciled in several ways:
- set the `on_schema_change` config to `append_new_columns`. (ClickHouse does not support `sync_all_columns`)
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.

Additional troubleshooting context:
Source columns not in target: {0}
Target columns not in source: {1}
New column types: {2}
"""

schema_change_datatype_error = """
The source and target schemas on this incremental model contain different data types. This is not supported.

Changed column types: {0}
"""

schema_change_missing_source_error = """
The target schema in on this incremental model contains a column not in the source schema. This is not supported.

Source columns not in target: {0}
"""
40 changes: 39 additions & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
from dbt.adapters.clickhouse.cache import ClickHouseRelationsCache
from dbt.adapters.clickhouse.column import ClickHouseColumn
from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.clickhouse.errors import (
schema_change_datatype_error,
schema_change_fail_error,
schema_change_missing_source_error,
)
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.relation import ClickHouseRelation
from dbt.adapters.clickhouse.util import compare_versions
from dbt.adapters.clickhouse.util import NewColumnDataType, compare_versions

GET_CATALOG_MACRO_NAME = 'get_catalog'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
Expand Down Expand Up @@ -151,6 +156,39 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
strategy = 'legacy'
return strategy

@available.parse_none
def check_incremental_schema_changes(
self, on_schema_change, existing, target_sql
) -> List[ClickHouseColumn]:
if on_schema_change not in ('fail', 'ignore', 'append_new_columns'):
raise DbtRuntimeError(
"Only `fail`, `ignore`, and `append_new_columns` supported for `on_schema_change`"
)
source = self.get_columns_in_relation(existing)
source_map = {column.name: column for column in source}
target = self.get_column_schema_from_query(target_sql)
target_map = {column.name: column for column in source}
source_not_in_target = [column for column in source if column.name not in target_map.keys()]
target_not_in_source = [column for column in target if column.name not in source_map.keys()]
new_column_data_types = []
for target_column in target:
source_column = source_map.get(target_column.name)
if source_column and source_column.dtype != target_column.dtype:
new_column_data_types.append(
NewColumnDataType(source_column.name, target_column.dtype)
)
if new_column_data_types:
raise DbtRuntimeError(schema_change_datatype_error.format(new_column_data_types))
if source_not_in_target:
raise DbtRuntimeError(schema_change_missing_source_error.format(source_not_in_target))
if target_not_in_source and on_schema_change == 'fail':
raise DbtRuntimeError(
schema_change_fail_error.format(
source_not_in_target, target_not_in_source, new_column_data_types
)
)
return target_not_in_source

@available.parse_none
def s3source_clause(
self,
Expand Down
8 changes: 8 additions & 0 deletions dbt/adapters/clickhouse/util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dataclasses import dataclass

from dbt.exceptions import DbtRuntimeError


Expand All @@ -11,3 +13,9 @@ def compare_versions(v1: str, v2: str) -> int:
except ValueError:
raise DbtRuntimeError("Version must consist of only numbers separated by '.'")
return 0


@dataclass
class NewColumnDataType:
column_name: str
new_type: str
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,23 @@
{% endcall %}

{% else %}
{% set schema_changes = none %}
{% set column_changes = none %}
{% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% if on_schema_change != 'ignore' %}
{%- set schema_changes = check_for_schema_changes(existing_relation, target_relation) -%}
{% if schema_changes['schema_changed'] and incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{% do log('Schema changes detected, switching to legacy incremental strategy') %}
{%- if on_schema_change != 'ignore' %}
{%- set column_changes = adapter.check_incremental_schema_changes(on_schema_change, existing_relation, sql) -%}
{%- if column_changes %}
{%- if incremental_strategy in ('append', 'delete_insert') %}
{% set incremental_strategy = 'legacy' %}
{{ log('Schema changes detected, switching to legacy incremental strategy') }}
{%- endif %}
{% endif %}
{% endif %}
{% if incremental_strategy != 'delete_insert' and incremental_predicates %}
{% do exceptions.raise_compiler_error('Cannot apply incremental predicates with ' + incremental_strategy + ' strategy.') %}
{% endif %}
{% if incremental_strategy == 'legacy' %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, schema_changes, unique_key) %}
{% do clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key) %}
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates) %}
Expand Down Expand Up @@ -109,32 +111,7 @@

{%- endmaterialization %}


{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{%- set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) -%}
{% if not schema_changes_dict['schema_changed'] %}
{{ return }}
{% endif %}

{% if on_schema_change == 'fail' %}
{% set fail_msg %}
The source and target schemas on this incremental model are out of sync!
They can be reconciled in several ways:
- set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation.
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
{% endset %}
{% do exceptions.raise_compiler_error(fail_msg) %}
{{ return }}
{% endif %}

{% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{% endmacro %}


{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, on_schema_change, unique_key, is_distributed=False) %}
{% macro clickhouse__incremental_legacy(existing_relation, intermediate_relation, column_changes, unique_key, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data'}) %}
{{ drop_relation_if_exists(new_data_relation) }}

Expand All @@ -143,10 +120,17 @@

-- First create a temporary table for all of the new data
{% if is_distributed %}
{% if column_changes %}
{% do exceptions.raise_compiler_error('Schema changes not supported with Distributed tables ') %}
{% endif %}
-- Need to use distributed table to have data on all shards
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}
{%- set inserting_relation = distributed_new_data_relation -%}
{{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }}
{% elif column_changes %}
{% call statement('create_new_data_temp') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{% endcall %}
{% else %}
{% call statement('create_new_data_temp') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
Expand All @@ -168,11 +152,11 @@

-- Insert all the existing rows into the new temporary table, ignoring any rows that have keys in the "new data"
-- table.
{%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{%- set source_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set source_columns_csv = source_columns | map(attribute='quoted') | join(', ') -%}
{% call statement('insert_existing_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
insert into {{ inserted_relation }} ({{ source_columns_csv }})
select {{ source_columns_csv }}
from {{ existing_relation }}
where ({{ unique_key }}) not in (
select {{ unique_key }}
Expand All @@ -182,9 +166,15 @@
{% endcall %}

-- Insert all of the new data into the temporary table
{% if column_changes %}
{%- set dest_columns = adapter.get_columns_in_relation(new_data_relation) -%}
{%- set dest_columns_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
{% else %}
{%- set dest_columns_csv = source_columns_csv %}
{% endif %}
{% call statement('insert_new_data') %}
insert into {{ inserted_relation }} ({{ dest_cols_csv }})
select {{ dest_cols_csv }}
insert into {{ inserted_relation }} ({{ dest_columns_csv }})
select {{ dest_columns_csv }}
from {{ inserting_relation }}
{{ adapter.get_model_query_settings(model) }}
{% endcall %}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/adapter/basic/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TestIncremental(BaseIncremental):


incremental_not_schema_change_sql = """
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="sync_all_columns") }}
{{ config(materialized="incremental", unique_key="user_id_current_time",on_schema_change="append_new_columns") }}
select
toString(1) || '-' || toString(now64()) as user_id_current_time,
{% if is_incremental() %}
Expand Down
71 changes: 71 additions & 0 deletions tests/integration/adapter/incremental/test_schema_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture

schema_change_sql = """
{{
config(
materialized='incremental',
unique_key='col_1',
on_schema_change='%schema_change%'
)
}}

{% if not is_incremental() %}
select
number as col_1,
number + 1 as col_2
from numbers(3)
{% else %}
select
number as col_1,
number + 1 as col_2,
number + 2 as col_3
from numbers(2, 3)
{% endif %}
"""


class TestOnSchemaChange:
@pytest.fixture(scope="class")
def models(self):
return {
"schema_change_ignore.sql": schema_change_sql.replace("%schema_change%", "ignore"),
"schema_change_fail.sql": schema_change_sql.replace("%schema_change%", "fail"),
"schema_change_append.sql": schema_change_sql.replace(
"%schema_change%", "append_new_columns"
),
}

def test_ignore(self, project):
run_dbt(["run", "--select", "schema_change_ignore"])
result = project.run_sql("select * from schema_change_ignore order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
run_dbt(["run", "--select", "schema_change_ignore"])
result = project.run_sql("select * from schema_change_ignore", fetch="all")
assert len(result) == 5

def test_fail(self, project):
run_dbt(["run", "--select", "schema_change_fail"])
result = project.run_sql("select * from schema_change_fail order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
_, log_output = run_dbt_and_capture(
[
"run",
"--select",
"schema_change_fail",
],
expect_pass=False,
)
assert 'out of sync' in log_output.lower()

def test_append(self, project):
run_dbt(["run", "--select", "schema_change_append"])
result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
assert len(result) == 3
assert result[0][1] == 1
run_dbt(["--debug", "run", "--select", "schema_change_append"])
result = project.run_sql("select * from schema_change_append order by col_1", fetch="all")
assert result[0][2] == 0
assert result[3][2] == 5