diff --git a/.gitignore b/.gitignore index 9caf202a6..4c05634f3 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ test/integration/.user.yml .DS_Store .vscode *.log +logs/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 7405e9e0e..c336aa508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,14 @@ -## dbt-spark 0.21.0 (Release TBD) +## dbt-spark 0.21.1 (Release TBD) + +### Fixes +- Fix `--store-failures` for tests, by suppressing irrelevant error in `comment_clause()` macro ([#232](https://github.com/dbt-labs/dbt-spark/issues/232), [#233](https://github.com/dbt-labs/dbt-spark/pull/233)) +- Add support for `on_schema_change` config in incremental models: `ignore`, `fail`, `append_new_columns`. For `sync_all_columns`, removing columns is not supported by Apache Spark or Delta Lake ([#198](https://github.com/dbt-labs/dbt-spark/issues/198), [#226](https://github.com/dbt-labs/dbt-spark/issues/226), [#229](https://github.com/dbt-labs/dbt-spark/pull/229)) +- Add `persist_docs` call to incremental model ([#224](https://github.com/dbt-labs/dbt-spark/issues/224), [#234](https://github.com/dbt-labs/dbt-spark/pull/234)) + +### Contributors +- [@binhnefits](https://github.com/binhnefits) ([#234](https://github.com/dbt-labs/dbt-spark/pull/234)) + +## dbt-spark 0.21.0 (October 4, 2021) ### Fixes - Enhanced get_columns_in_relation method to handle a bug in open source deltalake which doesnt return schema details in `show table extended in databasename like '*'` query output. This impacts dbt snapshots if file format is open source deltalake ([#207](https://github.com/dbt-labs/dbt-spark/pull/207)) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index fcdc46c6d..ee59b8131 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -32,7 +32,7 @@ {%- if raw_relation -%} comment '{{ model.description | replace("'", "\\'") }}' {% endif %} - {%- else -%} + {%- elif raw_persist_docs -%} {{ exceptions.raise_compiler_error("Invalid value provided for 'persist_docs'. Expected dict but got value: " ~ raw_persist_docs) }} {% endif %} {%- endmacro -%} @@ -125,7 +125,7 @@ {% macro spark__get_columns_in_relation(relation) -%} {% call statement('get_columns_in_relation', fetch_result=True) %} - describe extended {{ relation }} + describe extended {{ relation.include(schema=(schema is not none)) }} {% endcall %} {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} @@ -194,3 +194,48 @@ {% endfor %} {% endif %} {% endmacro %} + + +{% macro spark__make_temp_relation(base_relation, suffix) %} + {% set tmp_identifier = base_relation.identifier ~ suffix %} + {% set tmp_relation = base_relation.incorporate(path = { + "identifier": tmp_identifier, + "schema": None + }) -%} + + {% do return(tmp_relation) %} +{% endmacro %} + + +{% macro spark__alter_column_type(relation, column_name, new_column_type) -%} + {% call statement('alter_column_type') %} + alter table {{ relation }} alter column {{ column_name }} type {{ new_column_type }}; + {% endcall %} +{% endmacro %} + + +{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} + + {% if remove_columns %} + {% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %} + {{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }} + {% endif %} + + {% if add_columns is none %} + {% set add_columns = [] %} + {% endif %} + + {% set sql -%} + + alter {{ relation.type }} {{ relation }} + + {% if add_columns %} add columns {% endif %} + {% for column in add_columns %} + {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} + {% endfor %} + + {%- endset -%} + + {% do run_query(sql) %} + +{% endmacro %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index b11990b36..72b4d2516 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -11,6 +11,8 @@ {%- set partition_by = config.get('partition_by', none) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set target_relation = this %} {% set existing_relation = load_relation(this) %} @@ -31,6 +33,7 @@ {% set build_sql = create_table_as(False, target_relation, sql) %} {% else %} {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} {% endif %} @@ -38,6 +41,8 @@ {{ build_sql }} {%- endcall -%} + {% do persist_docs(target_relation, model) %} + {{ run_hooks(post_hooks) }} {{ return({'relations': [target_relation]}) }} diff --git a/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql new file mode 100644 index 000000000..86f6c7c42 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns.sql @@ -0,0 +1,28 @@ +{{ + config( + materialized='incremental', + on_schema_change='append_new_columns' + ) +}} + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2, + cast(field3 as {{string_type}}) as field3, + cast(field4 as {{string_type}}) as field4 +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 +FROM source_data where id <= 3 + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql new file mode 100644 index 000000000..55ed7b2c5 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_append_new_columns_target.sql @@ -0,0 +1,19 @@ +{{ + config(materialized='table') +}} + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id + ,cast(field1 as {{string_type}}) as field1 + ,cast(field2 as {{string_type}}) as field2 + ,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3 + ,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4 + +from source_data \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_fail.sql b/test/custom/incremental_on_schema_change/models/incremental_fail.sql new file mode 100644 index 000000000..939fc20c2 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_fail.sql @@ -0,0 +1,18 @@ +{{ + config( + materialized='incremental', + on_schema_change='fail' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, field1, field2 FROM source_data + +{% else %} + +SELECT id, field1, field3 FROm source_data + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_ignore.sql b/test/custom/incremental_on_schema_change/models/incremental_ignore.sql new file mode 100644 index 000000000..98f0a74a8 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_ignore.sql @@ -0,0 +1,18 @@ +{{ + config( + materialized='incremental', + on_schema_change='ignore' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, field1, field2 FROM source_data LIMIT 3 + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql b/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql new file mode 100644 index 000000000..92d4564e0 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_ignore_target.sql @@ -0,0 +1,15 @@ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id + ,field1 + ,field2 + +from source_data \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql new file mode 100644 index 000000000..2c5a461e5 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns.sql @@ -0,0 +1,30 @@ +{{ + config( + materialized='incremental', + on_schema_change='sync_all_columns' + + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +{% if is_incremental() %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field3 as {{string_type}}) as field3, -- to validate new fields + cast(field4 as {{string_type}}) AS field4 -- to validate new fields + +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +select id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 + +from source_data where id <= 3 + +{% endif %} \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql new file mode 100644 index 000000000..56591eb22 --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql @@ -0,0 +1,20 @@ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} + +select id + ,cast(field1 as {{string_type}}) as field1 + --,field2 + ,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3 + ,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4 + +from source_data +order by id \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/models/model_a.sql b/test/custom/incremental_on_schema_change/models/model_a.sql new file mode 100644 index 000000000..2a0b2ddaf --- /dev/null +++ b/test/custom/incremental_on_schema_change/models/model_a.sql @@ -0,0 +1,22 @@ +{{ + config(materialized='table') +}} + +with source_data as ( + + select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4 + union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4 + union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4 + union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4 + union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4 + union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4 + +) + +select id + ,field1 + ,field2 + ,field3 + ,field4 + +from source_data \ No newline at end of file diff --git a/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py b/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py new file mode 100644 index 000000000..e259e5c95 --- /dev/null +++ b/test/custom/incremental_on_schema_change/test_incremental_on_schema_change.py @@ -0,0 +1,157 @@ +from cProfile import run +from test.custom.base import DBTSparkIntegrationTest, use_profile +import dbt.exceptions + + +class TestIncrementalOnSchemaChange(DBTSparkIntegrationTest): + @property + def schema(self): + return "incremental_on_schema_change" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"] + } + + def run_twice_and_assert( + self, include, compare_source, compare_target + ): + + # dbt run (twice) + run_args = ['run'] + if include: + run_args.extend(('--models', include)) + results_one = self.run_dbt(run_args) + results_two = self.run_dbt(run_args) + + self.assertEqual(len(results_one), 3) + self.assertEqual(len(results_two), 3) + + self.assertTablesEqual(compare_source, compare_target) + + def run_incremental_ignore(self): + select = 'model_a incremental_ignore incremental_ignore_target' + compare_source = 'incremental_ignore' + compare_target = 'incremental_ignore_target' + self.run_twice_and_assert(select, compare_source, compare_target) + + def run_incremental_append_new_columns(self): + select = 'model_a incremental_append_new_columns incremental_append_new_columns_target' + compare_source = 'incremental_append_new_columns' + compare_target = 'incremental_append_new_columns_target' + self.run_twice_and_assert(select, compare_source, compare_target) + + def run_incremental_fail_on_schema_change(self): + select = 'model_a incremental_fail' + results_one = self.run_dbt(['run', '--models', select, '--full-refresh']) + results_two = self.run_dbt(['run', '--models', select], expect_pass = False) + self.assertIn('Compilation Error', results_two[1].message) + + def run_incremental_sync_all_columns(self): + # this doesn't work on Delta today + select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target' + compare_source = 'incremental_sync_all_columns' + compare_target = 'incremental_sync_all_columns_target' + results_one = self.run_dbt(['run', '--models', select, '--full-refresh']) + results_two = self.run_dbt(['run', '--models', select], expect_pass = False) + self.assertIn('Compilation Error', results_two[1].message) + + +class TestApacheSparkAppend(TestIncrementalOnSchemaChange): + + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"], + "models": { + "+incremental_strategy": "append", + } + } + + # only 'ignore' and 'fail' are supported + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + +class TestApacheSparkInsertOverwrite(TestIncrementalOnSchemaChange): + + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"], + "models": { + "+file_format": "parquet", + "+partition_by": "id", + "+incremental_strategy": "insert_overwrite", + } + } + + # only 'ignore' and 'fail' are supported + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('apache_spark') + def test__apache_spark__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + +class TestDeltaOnSchemaChange(TestIncrementalOnSchemaChange): + @property + def project_config(self): + return { + "config-version": 2, + "test-paths": ["tests"], + "models": { + "+file_format": "delta", + "+incremental_strategy": "merge", + "+unique_key": "id", + } + } + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_append_new_columns(self): + self.run_incremental_append_new_columns() + + @use_profile('databricks_cluster') + def test__databricks_cluster__run_incremental_sync_all_columns(self): + self.run_incremental_sync_all_columns() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_ignore(self): + self.run_incremental_ignore() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_fail_on_schema_change(self): + self.run_incremental_fail_on_schema_change() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_append_new_columns(self): + self.run_incremental_append_new_columns() + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint__run_incremental_sync_all_columns(self): + self.run_incremental_sync_all_columns() diff --git a/test/custom/persist_docs/models/incremental_delta_model.sql b/test/custom/persist_docs/models/incremental_delta_model.sql new file mode 100644 index 000000000..c3f325ea3 --- /dev/null +++ b/test/custom/persist_docs/models/incremental_delta_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='incremental', file_format='delta') }} +select 1 as id, 'Joe' as name diff --git a/test/custom/persist_docs/models/schema.yml b/test/custom/persist_docs/models/schema.yml index 78dcda799..2639037ba 100644 --- a/test/custom/persist_docs/models/schema.yml +++ b/test/custom/persist_docs/models/schema.yml @@ -69,3 +69,27 @@ models: -- /* comment */ Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + + - name: incremental_delta_model + description: | + Incremental model description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + columns: + - name: id + description: | + id Column description "with double quotes" + and with 'single quotes' as welll as other; + '''abc123''' + reserved -- characters + -- + /* comment */ + Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting + - name: name + description: | + Some stuff here and then a call to + {{ doc('my_fun_doc')}} diff --git a/test/custom/persist_docs/test_persist_docs.py b/test/custom/persist_docs/test_persist_docs.py index d9acf70d1..64c540854 100644 --- a/test/custom/persist_docs/test_persist_docs.py +++ b/test/custom/persist_docs/test_persist_docs.py @@ -42,7 +42,11 @@ def test_delta_comments(self): self.run_dbt(['seed']) self.run_dbt(['run']) - for table in ['table_delta_model', 'seed']: + for table, whatis in [ + ('table_delta_model', 'Table'), + ('seed', 'Seed'), + ('incremental_delta_model', 'Incremental') + ]: results = self.run_sql( 'describe extended {schema}.{table}'.format(schema=self.unique_schema(), table=table), fetch='all' @@ -50,7 +54,6 @@ def test_delta_comments(self): for result in results: if result[0] == 'Comment': - whatis = 'Seed' if table == 'seed' else 'Table' assert result[1].startswith(f'{whatis} model description') if result[0] == 'id': assert result[2].startswith('id Column description') diff --git a/test/custom/store_failures/models/schema.yml b/test/custom/store_failures/models/schema.yml new file mode 100644 index 000000000..be559b206 --- /dev/null +++ b/test/custom/store_failures/models/schema.yml @@ -0,0 +1,9 @@ +version: 2 + +models: + - name: view_model + columns: + - name: id + tests: + - unique + - not_null diff --git a/test/custom/store_failures/models/view_model.sql b/test/custom/store_failures/models/view_model.sql new file mode 100644 index 000000000..2ff36b4e2 --- /dev/null +++ b/test/custom/store_failures/models/view_model.sql @@ -0,0 +1,5 @@ +select 1 as id +union all +select 1 as id +union all +select null as id diff --git a/test/custom/store_failures/test_store_failures.py b/test/custom/store_failures/test_store_failures.py new file mode 100644 index 000000000..7a4aae7d8 --- /dev/null +++ b/test/custom/store_failures/test_store_failures.py @@ -0,0 +1,51 @@ +from test.custom.base import DBTSparkIntegrationTest, use_profile + +class TestStoreFailures(DBTSparkIntegrationTest): + @property + def schema(self): + return "store_failures" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + 'config-version': 2, + 'tests': { + '+store_failures': True, + '+severity': 'warn', + } + } + + def test_store_failures(self): + self.run_dbt(['run']) + results = self.run_dbt(['test', '--store-failures'], strict = False) + +class TestStoreFailuresApacheSpark(TestStoreFailures): + + @use_profile("apache_spark") + def test_store_failures_apache_spark(self): + self.test_store_failures() + +class TestStoreFailuresDelta(TestStoreFailures): + + @property + def project_config(self): + return { + 'config-version': 2, + 'tests': { + '+store_failures': True, + '+severity': 'warn', + '+file_format': 'delta', + } + } + + @use_profile("databricks_cluster") + def test_store_failures_databricks_cluster(self): + self.test_store_failures() + + @use_profile("databricks_sql_endpoint") + def test_store_failures_databricks_sql_endpoint(self): + self.test_store_failures()