From 24e295aef491052b5171e96ad3424474744947dd Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Thu, 14 Oct 2021 14:01:49 +0200 Subject: [PATCH 1/2] Add support for on_schema_change --- .gitignore | 1 + dbt/include/spark/macros/adapters.sql | 47 +++++- .../incremental/incremental.sql | 3 + .../models/incremental_append_new_columns.sql | 28 ++++ .../incremental_append_new_columns_target.sql | 19 +++ .../models/incremental_fail.sql | 18 ++ .../models/incremental_ignore.sql | 18 ++ .../models/incremental_ignore_target.sql | 15 ++ .../models/incremental_sync_all_columns.sql | 30 ++++ .../incremental_sync_all_columns_target.sql | 20 +++ .../models/model_a.sql | 22 +++ .../test_incremental_on_schema_change.py | 157 ++++++++++++++++++ 12 files changed, 377 insertions(+), 1 deletion(-) create mode 100644 test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql create mode 100644 test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql create mode 100644 test/custom/incremental_on_schema_change/models/incremental_fail.sql create mode 100644 test/custom/incremental_on_schema_change/models/incremental_ignore.sql create mode 100644 test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql create mode 100644 test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql create mode 100644 test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql create mode 100644 test/custom/incremental_on_schema_change/models/model_a.sql create mode 100644 test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py diff --git a/.gitignore b/.gitignore index 9caf202a6..4c05634f3 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ test/integration/.user.yml .DS_Store .vscode *.log +logs/ \ No newline at end of file diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index b966e9aa6..ee59b8131 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -125,7 +125,7 @@ {% macro spark__get_columns_in_relation(relation) -%} {% call statement('get_columns_in_relation', fetch_result=True) %} - describe extended {{ relation }} + describe extended {{ relation.include(schema=(schema is not none)) }} {% endcall %} {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} @@ -194,3 +194,48 @@ {% endfor %} {% endif %} {% endmacro %} + + +{% macro spark__make_temp_relation(base_relation, suffix) %} + {% set tmp_identifier = base_relation.identifier ~ suffix %} + {% set tmp_relation = base_relation.incorporate(path = { + "identifier": tmp_identifier, + "schema": None + }) -%} + + {% do return(tmp_relation) %} +{% endmacro %} + + +{% macro spark__alter_column_type(relation, column_name, new_column_type) -%} + {% call statement('alter_column_type') %} + alter table {{ relation }} alter column {{ column_name }} type {{ new_column_type }}; + {% endcall %} +{% endmacro %} + + +{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} + + {% if remove_columns %} + {% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %} + {{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }} + {% endif %} + + {% if add_columns is none %} + {% set add_columns = [] %} + {% endif %} + + {% set sql -%} + + alter {{ relation.type }} {{ relation }} + + {% if add_columns %} add columns {% endif %} + {% for column in add_columns %} + {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} + {% endfor %} + + {%- endset -%} + + {% do run_query(sql) %} + +{% endmacro %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index b11990b36..55bd3174f 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -11,6 +11,8 @@ {%- set partition_by = config.get('partition_by', none) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set target_relation = this %} {% set existing_relation = load_relation(this) %} @@ -31,6 +33,7 @@ {% set build_sql = create_table_as(False, target_relation, sql) %} {% else %} {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} {% endif %} diff --git a/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql new file mode 100644 index 000000000..86f6c7c42 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql @@ -0,0 +1,28 @@ +{{ + config( + materialized='incremental', + on_schema_change='append_new_columns' + ) +}} + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2, + cast(field3 as {{string_type}}) as field3, + cast(field4 as {{string_type}}) as field4 +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 +FROM source_data where id <= 3 + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql new file mode 100644 index 000000000..55ed7b2c5 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql @@ -0,0 +1,19 @@ +{{ + config(materialized='table') +}} + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id + ,cast(field1 as {{string_type}}) as field1 + ,cast(field2 as {{string_type}}) as field2 + ,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3 + ,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4 + +from source_data \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_fail.sql b/test/custom/incremental_on_schema_change/models/incremental_fail.sql new file mode 100644 index 000000000..939fc20c2 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_fail.sql @@ -0,0 +1,18 @@ +{{ + config( + materialized='incremental', + on_schema_change='fail' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, field1, field2 FROM source_data + +{% else %} + +SELECT id, field1, field3 FROm source_data + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_ignore.sql b/test/custom/incremental_on_schema_change/models/incremental_ignore.sql new file mode 100644 index 000000000..98f0a74a8 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_ignore.sql @@ -0,0 +1,18 @@ +{{ + config( + materialized='incremental', + on_schema_change='ignore' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, field1, field2 FROM source_data LIMIT 3 + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql b/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql new file mode 100644 index 000000000..92d4564e0 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql @@ -0,0 +1,15 @@ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id + ,field1 + ,field2 + +from source_data \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql new file mode 100644 index 000000000..2c5a461e5 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql @@ -0,0 +1,30 @@ +{{ + config( + materialized='incremental', + on_schema_change='sync_all_columns' + + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +{% if is_incremental() %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field3 as {{string_type}}) as field3, -- to validate new fields + cast(field4 as {{string_type}}) AS field4 -- to validate new fields + +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +select id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 + +from source_data where id <= 3 + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql new file mode 100644 index 000000000..56591eb22 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql @@ -0,0 +1,20 @@ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +select id + ,cast(field1 as {{string_type}}) as field1 + --,field2 + ,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3 + ,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4 + +from source_data +order by id \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/model_a.sql b/test/custom/incremental_on_schema_change/models/model_a.sql new file mode 100644 index 000000000..2a0b2ddaf --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/model_a.sql @@ -0,0 +1,22 @@ +{{ + config(materialized='table') +}} + +with source_data as ( + + select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4 + union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4 + union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4 + union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4 + union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4 + union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4 + +) + +select id + ,field1 + ,field2 + ,field3 + ,field4 + +from source_data \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py b/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py new file mode 100644 index 000000000..e259e5c95 --- /dev/null +++ b/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py @@ -0,0 +1,157 @@ +from cProfile import run +from test.custom.base import DBTSparkIntegrationTest, use_profile +import dbt.exceptions + + +class TestIncrementalOnSchemaChange(DBTSparkIntegrationTest): + @property + def schema(self): + return "incremental_on_schema_change" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"] + } + + def run_twice_and_assert( + self, include, compare_source, compare_target + ): + + # dbt run (twice) + run_args = ['run'] + if include: + run_args.extend(('--models', include)) + results_one = self.run_dbt(run_args) + results_two = self.run_dbt(run_args) + + self.assertEqual(len(results_one), 3) + self.assertEqual(len(results_two), 3) + + self.assertTablesEqual(compare_source, compare_target) + + def run_incremental_ignore(self): + select = 'model_a incremental_ignore incremental_ignore_target' + compare_source = 'incremental_ignore' + compare_target = 'incremental_ignore_target' + self.run_twice_and_assert(select, compare_source, compare_target) + + def run_incremental_append_new_columns(self): + select = 'model_a incremental_append_new_columns incremental_append_new_columns_target' + compare_source = 'incremental_append_new_columns' + compare_target = 'incremental_append_new_columns_target' + self.run_twice_and_assert(select, compare_source, compare_target) + + def run_incremental_fail_on_schema_change(self): + select = 'model_a incremental_fail' + results_one = self.run_dbt(['run', '--models', select, '--full-refresh']) + results_two = self.run_dbt(['run', '--models', select], expect_pass = False) + self.assertIn('Compilation Error', results_two[1].message) + + def run_incremental_sync_all_columns(self): + # this doesn't work on Delta today + select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target' + compare_source = 'incremental_sync_all_columns' + compare_target = 'incremental_sync_all_columns_target' + results_one = self.run_dbt(['run', '--models', select, '--full-refresh']) + results_two = self.run_dbt(['run', '--models', select], expect_pass = False) + self.assertIn('Compilation Error', results_two[1].message) + + +class TestApacheSparkAppend(TestIncrementalOnSchemaChange): + + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"], + "models": { + "+incremental_strategy": "append", + } + } + + # only 'ignore' and 'fail' are supported + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + +class TestApacheSparkInsertOverwrite(TestIncrementalOnSchemaChange): + + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"], + "models": { + "+file_format": "parquet", + "+partition_by": "id", + "+incremental_strategy": "insert_overwrite", + } + } + + # only 'ignore' and 'fail' are supported + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + +class TestDeltaOnSchemaChange(TestIncrementalOnSchemaChange): + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"], + "models": { + "+file_format": "delta", + "+incremental_strategy": "merge", + "+unique_key": "id", + } + } + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_append_new_columns(self): + self.run_incremental_append_new_columns() + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_sync_all_columns(self): + self.run_incremental_sync_all_columns() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_append_new_columns(self): + self.run_incremental_append_new_columns() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_sync_all_columns(self): + self.run_incremental_sync_all_columns() From b0c7eaeafe6107823409617b98d8b22426ce6c7f Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 15 Oct 2021 17:09:49 +0200 Subject: [PATCH 2/2] Add changelog note --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 240c6810f..d821c39c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Fixes - Fix `--store-failures` for tests, by suppressing irrelevant error in `comment_clause()` macro ([#232](https://github.com/dbt-labs/dbt-spark/issues/232), [#233](https://github.com/dbt-labs/dbt-spark/pull/233)) +- Add support for `on_schema_change` config in incremental models: `ignore`, `fail`, `append_new_columns`. For `sync_all_columns`, removing columns is not supported by Apache Spark or Delta Lake ([#198](https://github.com/dbt-labs/dbt-spark/issues/198), [#226](https://github.com/dbt-labs/dbt-spark/issues/226), [#229](https://github.com/dbt-labs/dbt-spark/pull/229)) ## dbt-spark 0.21.0 (October 4, 2021)