From 8ba1fc1b7ae1dc25746bc2b31451cc9c06382899 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Tue, 12 Jan 2021 13:28:25 +0100 Subject: [PATCH 01/15] 1. partitionOverwriteMode = DYNAMIC iff insert_overwrite + partition_by --- .../spark/macros/materializations/incremental.sql | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 000659a8f..921c165d5 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -100,16 +100,10 @@ {% do dbt_spark_validate_merge(file_format) %} {% endif %} - {% if config.get('partition_by') %} - {% call statement() %} - set spark.sql.sources.partitionOverwriteMode = DYNAMIC - {% endcall %} + {% if strategy == 'insert_overwrite' and config.get('partition_by') %} + set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endif %} - {% call statement() %} - set spark.sql.hive.convertMetastoreParquet = false - {% endcall %} - {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} From 6de3d070d4e77487ecde6d7bb14fe2056beb6fb4 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Tue, 12 Jan 2021 13:31:04 +0100 Subject: [PATCH 02/15] 2. Error if insert_overwrite + delta --- .../macros/materializations/incremental.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 921c165d5..b824c482a 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -37,6 +37,18 @@ Invalid incremental strategy provided: {{ strategy }} You can only choose this strategy when file_format is set to 'delta' {%- endset %} + + {% set invalid_insert_overwrite_delta_msg -%} + Invalid incremental strategy provided: {{ strategy }} + You cannot use this strategy when file_format is set to 'delta' + Use the `merge` strategy instead + {%- endset %} + + {% set invalid_insert_overwrite_endpoint_msg -%} + Invalid incremental strategy provided: {{ strategy }} + You cannot use this strategy when connecting via endpoint + Use `incremental_strategy: merge` with `file_format: delta` instead + {%- endset %} {% if strategy not in ['merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} @@ -44,6 +56,12 @@ {% if strategy == 'merge' and file_format != 'delta' %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} + {% if strategy == 'insert_overwrite' and file_format == 'delta' %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %} + {% endif %} + {% if strategy == 'insert_overwrite' and target.endpoint %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} + {% endif %} {% endif %} {% do return(strategy) %} From fb6a4bb6096bd0512771c6b7c9db01d758f4f7b3 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Tue, 12 Jan 2021 13:47:59 +0100 Subject: [PATCH 03/15] 3. Append-only insert if no partitions --- README.md | 3 ++- .../spark/macros/materializations/incremental.sql | 5 ++++- test/integration/spark-thrift.dbtspec | 11 ----------- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 7d309ece9..9297c9833 100644 --- a/README.md +++ b/README.md @@ -168,7 +168,8 @@ The following configurations can be supplied to models run with the dbt-spark pl **Incremental Models** To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant -data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will overwrite the entire table as an atomic operation, replacing it with new data of the same schema. This is analogous to `truncate` + `insert`. +data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will simply +append new data to the model, without overwriting any existing data. ``` {{ config( diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index b824c482a..dd159cf12 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -1,8 +1,11 @@ {% macro get_insert_overwrite_sql(source_relation, target_relation) %} + {%- set cols = config.get('partition_by', validator=validation.any[list, basestring]) -%} + {%- set insert = 'insert overwrite' if cols is not none else 'insert into' -%} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert overwrite table {{ target_relation }} + {{ insert }} table {{ target_relation }} {{ partition_cols(label="partition") }} select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} diff --git a/test/integration/spark-thrift.dbtspec b/test/integration/spark-thrift.dbtspec index 58f5a9065..85b843f37 100644 --- a/test/integration/spark-thrift.dbtspec +++ b/test/integration/spark-thrift.dbtspec @@ -7,17 +7,6 @@ target: connect_retries: 5 connect_timeout: 60 schema: "analytics_{{ var('_dbt_random_suffix') }}" -projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 sequences: test_dbt_empty: empty test_dbt_base: base From 6f7e1f2f4241e1890c5f236f8338469392a6d7f1 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Tue, 12 Jan 2021 14:08:05 +0100 Subject: [PATCH 04/15] 4. Append-only merge if no unique_key --- README.md | 3 +- .../macros/materializations/incremental.sql | 32 ++++++++----------- .../integration/spark-databricks-http.dbtspec | 11 ------- .../spark-databricks-odbc-cluster.dbtspec | 10 ------ ...spark-databricks-odbc-sql-endpoint.dbtspec | 10 ------ 5 files changed, 15 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 9297c9833..e9cee6ade 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,8 @@ where date_day::date >= '2019-01-01' group by 1 ``` -The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records. +The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply +append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.) ``` {{ config( diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index dd159cf12..dde4bdd0f 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -70,23 +70,21 @@ {% do return(strategy) %} {% endmacro %} -{% macro dbt_spark_validate_merge(file_format) %} - {% set invalid_file_format_msg -%} - You can only choose the 'merge' incremental_strategy when file_format is set to 'delta' - {%- endset %} - - {% if file_format != 'delta' %} - {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} - {% endif %} - -{% endmacro %} - {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} {# ignore dest_columns - we will just use `*` #} + + {% set merge_condition %} + {% if unique_key %} + on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% else %} + on false + {% endif %} + {% endset %} + merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {{ merge_condition }} when matched then update set * when not matched then insert * {% endmacro %} @@ -106,9 +104,10 @@ {% materialization incremental, adapter='spark' -%} {#-- Validate early so we don't run SQL if the file_format is invalid --#} - {% set file_format = dbt_spark_validate_get_file_format() -%} + {%- set file_format = dbt_spark_validate_get_file_format() -%} {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {%- set unique_key = config.get('unique_key', none) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} @@ -116,11 +115,6 @@ {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} - {% if strategy == 'merge' %} - {%- set unique_key = config.require('unique_key') -%} - {% do dbt_spark_validate_merge(file_format) %} - {% endif %} - {% if strategy == 'insert_overwrite' and config.get('partition_by') %} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endif %} diff --git a/test/integration/spark-databricks-http.dbtspec b/test/integration/spark-databricks-http.dbtspec index c20e4242b..67342da39 100644 --- a/test/integration/spark-databricks-http.dbtspec +++ b/test/integration/spark-databricks-http.dbtspec @@ -9,16 +9,6 @@ target: connect_retries: 5 connect_timeout: 60 projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta # we're going to UPDATE the seed tables as part of testing, so we must make them delta format @@ -40,4 +30,3 @@ sequences: test_dbt_data_test: data_test test_dbt_ephemeral_data_tests: data_test_ephemeral_models test_dbt_schema_test: schema_test - diff --git a/test/integration/spark-databricks-odbc-cluster.dbtspec b/test/integration/spark-databricks-odbc-cluster.dbtspec index 8dc4975ea..b320dc3a4 100644 --- a/test/integration/spark-databricks-odbc-cluster.dbtspec +++ b/test/integration/spark-databricks-odbc-cluster.dbtspec @@ -10,16 +10,6 @@ target: connect_retries: 5 connect_timeout: 60 projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta # we're going to UPDATE the seed tables as part of testing, so we must make them delta format diff --git a/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec b/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec index 0251cb312..0aa7be765 100644 --- a/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec +++ b/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec @@ -10,16 +10,6 @@ target: connect_retries: 5 connect_timeout: 60 projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta # we're going to UPDATE the seed tables as part of testing, so we must make them delta format From b60d7c2712ba18425b1e1bbfc69c2ff520261938 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 13 Jan 2021 17:30:00 +0100 Subject: [PATCH 05/15] Revert 3. Full table insert_overwrite if no partitions --- dbt/include/spark/macros/materializations/incremental.sql | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index dde4bdd0f..d912a45f9 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -1,11 +1,8 @@ {% macro get_insert_overwrite_sql(source_relation, target_relation) %} - - {%- set cols = config.get('partition_by', validator=validation.any[list, basestring]) -%} - {%- set insert = 'insert overwrite' if cols is not none else 'insert into' -%} {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - {{ insert }} table {{ target_relation }} + insert overwrite table {{ target_relation }} {{ partition_cols(label="partition") }} select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} From c8e3770e077e8c54026156b14e61133ef59fa7ff Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 13 Jan 2021 17:30:14 +0100 Subject: [PATCH 06/15] Introduce 'append' strategy as default --- CHANGELOG.md | 4 ++ README.md | 49 +++++++++++++------ .../macros/materializations/incremental.sql | 31 +++++++++--- 3 files changed, 60 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6e1560c0..31499aa9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ### Breaking changes - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) +- Incremental models have `incremental_strategy: append` by default. This strategy adds new records +without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending +on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error +if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) ### Under the hood - Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. diff --git a/README.md b/README.md index e9cee6ade..38a1e8ec4 100644 --- a/README.md +++ b/README.md @@ -161,50 +161,67 @@ The following configurations can be supplied to models run with the dbt-spark pl | partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | | clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | | buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | -| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` | +| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` | | persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` | **Incremental Models** -To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant -data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will simply -append new data to the model, without overwriting any existing data. +dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations: +- `append` (default): Insert new records without updating or overwriting any existing data. +- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.] +- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.] +Examples: + +```sql +{{ config( + materialized='incremental', + incremental_strategy='append' +) }} + + +-- All rows returned by this query will be appended to the existing table + +select * from {{ ref('events') }} +{% if is_incremental() %} + where event_ts > (select max(event_ts) from {{ this }}) +{% endif %} ``` + +```sql {{ config( materialized='incremental', + incremental_strategy='merge', partition_by=['date_day'], file_format='parquet' ) }} -/* - Every partition returned by this query will be overwritten - when this model runs -*/ +-- Every partition returned by this query will overwrite existing partitions select date_day, count(*) as users from {{ ref('events') }} -where date_day::date >= '2019-01-01' +{% if is_incremental() %} + where date_day > (select max(date_day) from {{ this }}) +{% endif %} group by 1 ``` -The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply -append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.) - -``` +```sql {{ config( materialized='incremental', incremental_strategy='merge', - partition_by=['date_day'], + unique_key='event_id', file_format='delta' ) }} -select * -from {{ ref('events') }} +-- Existing events, matched on `event_id`, will be updated +-- New events will be appended + +select * from {{ ref('events') }} {% if is_incremental() %} where date_day > (select max(date_day) from {{ this }}) {% endif %} diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index d912a45f9..bb6d5dc0c 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -8,6 +8,17 @@ {% endmacro %} + +{% macro get_insert_into_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into table {{ target_relation }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + +{% endmacro %} + + {% macro dbt_spark_validate_get_file_format() %} {#-- Find and validate the file format #} {%- set file_format = config.get("file_format", default="parquet") -%} @@ -24,13 +35,14 @@ {% do return(file_format) %} {% endmacro %} + {% macro dbt_spark_validate_get_incremental_strategy(file_format) %} {#-- Find and validate the incremental strategy #} - {%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%} + {%- set strategy = config.get("incremental_strategy", default="append") -%} {% set invalid_strategy_msg -%} Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'merge', 'insert_overwrite' + Expected one of: 'append', 'merge', 'insert_overwrite' {%- endset %} {% set invalid_merge_msg -%} @@ -41,16 +53,16 @@ {% set invalid_insert_overwrite_delta_msg -%} Invalid incremental strategy provided: {{ strategy }} You cannot use this strategy when file_format is set to 'delta' - Use the `merge` strategy instead + Use the 'append' or 'merge' strategy instead {%- endset %} {% set invalid_insert_overwrite_endpoint_msg -%} Invalid incremental strategy provided: {{ strategy }} You cannot use this strategy when connecting via endpoint - Use `incremental_strategy: merge` with `file_format: delta` instead + Use the 'append' or 'merge' strategy instead {%- endset %} - {% if strategy not in ['merge', 'insert_overwrite'] %} + {% if strategy not in ['append', 'merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if strategy == 'merge' and file_format != 'delta' %} @@ -88,11 +100,14 @@ {% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} - {%- if strategy == 'insert_overwrite' -%} + {%- if strategy == 'append' -%} + {#-- insert new records into existing table, without updating or overwriting #} + {{ get_insert_into_sql(source, target) }} + {%- elif strategy == 'insert_overwrite' -%} {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target) }} - {%- else -%} - {#-- merge all columns with databricks delta - schema changes are handled for us #} + {%- elif strategy == 'merge' -%} + {#-- merge all columns with databricks delta - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} {%- endif -%} From bc18f8022e23c2fcdba84253d39be8f071c526e2 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Thu, 14 Jan 2021 13:26:47 +0100 Subject: [PATCH 07/15] PR feedback. Reorganize macros --- CHANGELOG.md | 5 +- README.md | 4 +- .../macros/materializations/incremental.sql | 154 ------------------ .../incremental/incremental.sql | 45 +++++ .../incremental/strategies.sql | 58 +++++++ .../materializations/incremental/validate.sql | 59 +++++++ 6 files changed, 165 insertions(+), 160 deletions(-) delete mode 100644 dbt/include/spark/macros/materializations/incremental.sql create mode 100644 dbt/include/spark/macros/materializations/incremental/incremental.sql create mode 100644 dbt/include/spark/macros/materializations/incremental/strategies.sql create mode 100644 dbt/include/spark/macros/materializations/incremental/validate.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 31499aa9e..485b88e94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,7 @@ ### Breaking changes - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) -- Incremental models have `incremental_strategy: append` by default. This strategy adds new records -without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending -on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error -if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) +- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) ### Under the hood - Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. diff --git a/README.md b/README.md index 38a1e8ec4..09e716b5c 100644 --- a/README.md +++ b/README.md @@ -169,7 +169,7 @@ The following configurations can be supplied to models run with the dbt-spark pl dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations: - `append` (default): Insert new records without updating or overwriting any existing data. -- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.] +- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization instead.] - `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.] Examples: @@ -177,7 +177,7 @@ Examples: ```sql {{ config( materialized='incremental', - incremental_strategy='append' + incremental_strategy='append', ) }} diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql deleted file mode 100644 index bb6d5dc0c..000000000 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ /dev/null @@ -1,154 +0,0 @@ -{% macro get_insert_overwrite_sql(source_relation, target_relation) %} - - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert overwrite table {{ target_relation }} - {{ partition_cols(label="partition") }} - select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} - -{% endmacro %} - - -{% macro get_insert_into_sql(source_relation, target_relation) %} - - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert into table {{ target_relation }} - select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} - -{% endmacro %} - - -{% macro dbt_spark_validate_get_file_format() %} - {#-- Find and validate the file format #} - {%- set file_format = config.get("file_format", default="parquet") -%} - - {% set invalid_file_format_msg -%} - Invalid file format provided: {{ file_format }} - Expected one of: 'text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm' - {%- endset %} - - {% if file_format not in ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %} - {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} - {% endif %} - - {% do return(file_format) %} -{% endmacro %} - - -{% macro dbt_spark_validate_get_incremental_strategy(file_format) %} - {#-- Find and validate the incremental strategy #} - {%- set strategy = config.get("incremental_strategy", default="append") -%} - - {% set invalid_strategy_msg -%} - Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'append', 'merge', 'insert_overwrite' - {%- endset %} - - {% set invalid_merge_msg -%} - Invalid incremental strategy provided: {{ strategy }} - You can only choose this strategy when file_format is set to 'delta' - {%- endset %} - - {% set invalid_insert_overwrite_delta_msg -%} - Invalid incremental strategy provided: {{ strategy }} - You cannot use this strategy when file_format is set to 'delta' - Use the 'append' or 'merge' strategy instead - {%- endset %} - - {% set invalid_insert_overwrite_endpoint_msg -%} - Invalid incremental strategy provided: {{ strategy }} - You cannot use this strategy when connecting via endpoint - Use the 'append' or 'merge' strategy instead - {%- endset %} - - {% if strategy not in ['append', 'merge', 'insert_overwrite'] %} - {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} - {%-else %} - {% if strategy == 'merge' and file_format != 'delta' %} - {% do exceptions.raise_compiler_error(invalid_merge_msg) %} - {% endif %} - {% if strategy == 'insert_overwrite' and file_format == 'delta' %} - {% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %} - {% endif %} - {% if strategy == 'insert_overwrite' and target.endpoint %} - {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} - {% endif %} - {% endif %} - - {% do return(strategy) %} -{% endmacro %} - - -{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} - {# ignore dest_columns - we will just use `*` #} - - {% set merge_condition %} - {% if unique_key %} - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} - {% else %} - on false - {% endif %} - {% endset %} - - merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE - {{ merge_condition }} - when matched then update set * - when not matched then insert * -{% endmacro %} - - -{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} - {%- if strategy == 'append' -%} - {#-- insert new records into existing table, without updating or overwriting #} - {{ get_insert_into_sql(source, target) }} - {%- elif strategy == 'insert_overwrite' -%} - {#-- insert statements don't like CTEs, so support them via a temp view #} - {{ get_insert_overwrite_sql(source, target) }} - {%- elif strategy == 'merge' -%} - {#-- merge all columns with databricks delta - schema changes are handled for us #} - {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} - {%- endif -%} - -{% endmacro %} - - -{% materialization incremental, adapter='spark' -%} - {#-- Validate early so we don't run SQL if the file_format is invalid --#} - {%- set file_format = dbt_spark_validate_get_file_format() -%} - {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} - {%- set unique_key = config.get('unique_key', none) -%} - - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - - {% if strategy == 'insert_overwrite' and config.get('partition_by') %} - set spark.sql.sources.partitionOverwriteMode = DYNAMIC - {% endif %} - - {{ run_hooks(pre_hooks) }} - - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {% do adapter.drop_relation(existing_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% else %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} - {% endif %} - - {%- call statement('main') -%} - {{ build_sql }} - {%- endcall -%} - - {{ run_hooks(post_hooks) }} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql new file mode 100644 index 000000000..b11990b36 --- /dev/null +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -0,0 +1,45 @@ +{% materialization incremental, adapter='spark' -%} + + {#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#} + {%- set raw_file_format = config.get('file_format', default='parquet') -%} + {%- set raw_strategy = config.get('incremental_strategy', default='append') -%} + + {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} + {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + + {%- set unique_key = config.get('unique_key', none) -%} + {%- set partition_by = config.get('partition_by', none) -%} + + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} + + {% if strategy == 'insert_overwrite' and partition_by %} + {% call statement() %} + set spark.sql.sources.partitionOverwriteMode = DYNAMIC + {% endcall %} + {% endif %} + + {{ run_hooks(pre_hooks) }} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {%- endcall -%} + + {{ run_hooks(post_hooks) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql new file mode 100644 index 000000000..d3ffafc10 --- /dev/null +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -0,0 +1,58 @@ +{% macro get_insert_overwrite_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert overwrite table {{ target_relation }} + {{ partition_cols(label="partition") }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + +{% endmacro %} + + +{% macro get_insert_into_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into table {{ target_relation }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + +{% endmacro %} + + +{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} + {# ignore dest_columns - we will just use `*` #} + + {% set merge_condition %} + {% if unique_key %} + on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% else %} + on false + {% endif %} + {% endset %} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE + {{ merge_condition }} + when matched then update set * + when not matched then insert * +{% endmacro %} + + +{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} + {%- if strategy == 'append' -%} + {#-- insert new records into existing table, without updating or overwriting #} + {{ get_insert_into_sql(source, target) }} + {%- elif strategy == 'insert_overwrite' -%} + {#-- insert statements don't like CTEs, so support them via a temp view #} + {{ get_insert_overwrite_sql(source, target) }} + {%- elif strategy == 'merge' -%} + {#-- merge all columns with databricks delta - schema changes are handled for us #} + {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} + {%- else -%} + {% set no_sql_for_strategy_msg -%} + No known SQL for the incremental strategy provided: {{ strategy }} + {%- endset %} + {%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%} + {%- endif -%} + +{% endmacro %} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql new file mode 100644 index 000000000..400a2eee5 --- /dev/null +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -0,0 +1,59 @@ +{% macro dbt_spark_validate_get_file_format(raw_file_format) %} + {#-- Validate the file format #} + + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %} + + {% set invalid_file_format_msg -%} + Invalid file format provided: {{ raw_file_format }} + Expected one of: {{ accepted_formats | join(', ') }} + {%- endset %} + + {% if raw_file_format not in accepted_formats %} + {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} + {% endif %} + + {% do return(raw_file_format) %} +{% endmacro %} + + +{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %} + {#-- Validate the incremental strategy #} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'append', 'merge', 'insert_overwrite' + {%- endset %} + + {% set invalid_merge_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You can only choose this strategy when file_format is set to 'delta' + {%- endset %} + + {% set invalid_insert_overwrite_delta_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when file_format is set to 'delta' + Use the 'append' or 'merge' strategy instead + {%- endset %} + + {% set invalid_insert_overwrite_endpoint_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when connecting via endpoint + Use the 'append' or 'merge' strategy instead + {%- endset %} + + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {%-else %} + {% if raw_strategy == 'merge' and file_format != 'delta' %} + {% do exceptions.raise_compiler_error(invalid_merge_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and target.endpoint %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} + {% endif %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} From af77fd83babacd3b29400783e97c5fbfc13fcc86 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 3 Feb 2021 14:31:41 +0100 Subject: [PATCH 08/15] Try adding custom integration tests --- dev_requirements.txt | 2 +- test/custom/__init__.py | 0 test/custom/base.py | 67 ++++++++++++++ .../data/expected_append.csv | 5 ++ .../data/expected_overwrite.csv | 3 + .../data/expected_upsert.csv | 4 + .../models/default_append.sql | 17 ++++ .../models_bad/bad_file_format.sql | 18 ++++ .../models_bad/bad_insert_overwrite_delta.sql | 19 ++++ .../models_bad/bad_merge_not_delta.sql | 18 ++++ .../models_bad/bad_strategy.sql | 18 ++++ .../models_delta/append_delta.sql | 19 ++++ .../models_delta/merge_no_key.sql | 19 ++++ .../models_delta/merge_unique_key.sql | 20 +++++ .../insert_overwrite_no_partitions.sql | 18 ++++ .../insert_overwrite_partitions.sql | 20 +++++ .../test_incremental_strategies.py | 87 +++++++++++++++++++ 17 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 test/custom/__init__.py create mode 100644 test/custom/base.py create mode 100644 test/custom/incremental_strategies/data/expected_append.csv create mode 100644 test/custom/incremental_strategies/data/expected_overwrite.csv create mode 100644 test/custom/incremental_strategies/data/expected_upsert.csv create mode 100644 test/custom/incremental_strategies/models/default_append.sql create mode 100644 test/custom/incremental_strategies/models_bad/bad_file_format.sql create mode 100644 test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql create mode 100644 test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql create mode 100644 test/custom/incremental_strategies/models_bad/bad_strategy.sql create mode 100644 test/custom/incremental_strategies/models_delta/append_delta.sql create mode 100644 test/custom/incremental_strategies/models_delta/merge_no_key.sql create mode 100644 test/custom/incremental_strategies/models_delta/merge_unique_key.sql create mode 100644 test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql create mode 100644 test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql create mode 100644 test/custom/incremental_strategies/test_incremental_strategies.py diff --git a/dev_requirements.txt b/dev_requirements.txt index d662a12b5..6180d33c3 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -10,6 +10,6 @@ pytest-xdist>=2.1.0,<3 flaky>=3.5.3,<4 # Test requirements -pytest-dbt-adapter==0.4.0 +git+https://github.com/fishtown-analytics/dbt-adapter-tests.git@feature/add-integration-test-tools sasl==0.2.1 thrift_sasl==0.4.1 diff --git a/test/custom/__init__.py b/test/custom/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/custom/base.py b/test/custom/base.py new file mode 100644 index 000000000..ed34e878c --- /dev/null +++ b/test/custom/base.py @@ -0,0 +1,67 @@ +from dbt_adapter_tests import DBTIntegrationTestBase, use_profile + +class DBTSparkIntegrationTest(DBTIntegrationTestBase): + + def apache_spark_profile(self): + return { + 'config': { + 'send_anonymous_usage_stats': False + }, + 'test': { + 'outputs': { + 'default2': { + 'type': 'spark', + 'host': 'localhost', + 'user': 'dbt', + 'method': 'thrift', + 'port': '10000', + 'connect_retries': '5', + 'connect_timeout': '60', + 'schema': self.unique_schema() + }, + 'target': 'default2' + } + } + } + + def databricks_cluster_profile(self): + return { + 'config': { + 'send_anonymous_usage_stats': False + }, + 'test': { + 'outputs': { + 'odbc': { + 'type': 'spark', + 'method': 'odbc', + 'host': os.getenv('DBT_DATABRICKS_HOST_NAME'), + 'cluster': os.getenv('DBT_DATABRICKS_CLUSTER_NAME'), + 'token': os.getenv('DBT_DATABRICKS_TOKEN'), + 'port': 443, + 'schema': self.unique_schema() + }, + 'target': 'odbc' + } + } + } + + def databricks_sql_endpoint_profile(self): + return { + 'config': { + 'send_anonymous_usage_stats': False + }, + 'test': { + 'outputs': { + 'default2': { + 'type': 'spark', + 'method': 'odbc', + 'host': os.getenv('DBT_DATABRICKS_HOST_NAME'), + 'endpoint': os.getenv('DBT_DATABRICKS_ENDPOINT'), + 'token': os.getenv('DBT_DATABRICKS_TOKEN'), + 'port': 443, + 'schema': self.unique_schema() + }, + 'target': 'default2' + } + } + } diff --git a/test/custom/incremental_strategies/data/expected_append.csv b/test/custom/incremental_strategies/data/expected_append.csv new file mode 100644 index 000000000..c96e569bd --- /dev/null +++ b/test/custom/incremental_strategies/data/expected_append.csv @@ -0,0 +1,5 @@ +id,msg +1,hello +2,goodbye +2,yo +3,anyway \ No newline at end of file diff --git a/test/custom/incremental_strategies/data/expected_overwrite.csv b/test/custom/incremental_strategies/data/expected_overwrite.csv new file mode 100644 index 000000000..46d8f6050 --- /dev/null +++ b/test/custom/incremental_strategies/data/expected_overwrite.csv @@ -0,0 +1,3 @@ +id,msg +2,yo +3,anyway \ No newline at end of file diff --git a/test/custom/incremental_strategies/data/expected_upsert.csv b/test/custom/incremental_strategies/data/expected_upsert.csv new file mode 100644 index 000000000..71805dfc5 --- /dev/null +++ b/test/custom/incremental_strategies/data/expected_upsert.csv @@ -0,0 +1,4 @@ +id,msg +1,hello +2,yo +3,anyway \ No newline at end of file diff --git a/test/custom/incremental_strategies/models/default_append.sql b/test/custom/incremental_strategies/models/default_append.sql new file mode 100644 index 000000000..cef0d5ace --- /dev/null +++ b/test/custom/incremental_strategies/models/default_append.sql @@ -0,0 +1,17 @@ +{{ config( + materialized = 'incremental', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_file_format.sql b/test/custom/incremental_strategies/models_bad/bad_file_format.sql new file mode 100644 index 000000000..b0b6abd47 --- /dev/null +++ b/test/custom/incremental_strategies/models_bad/bad_file_format.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + file_format = 'something_else', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql b/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql new file mode 100644 index 000000000..f0d87b377 --- /dev/null +++ b/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + file_format = 'delta', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql b/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql new file mode 100644 index 000000000..e07674c36 --- /dev/null +++ b/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_strategy.sql b/test/custom/incremental_strategies/models_bad/bad_strategy.sql new file mode 100644 index 000000000..256f34846 --- /dev/null +++ b/test/custom/incremental_strategies/models_bad/bad_strategy.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'something_else', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_delta/append_delta.sql b/test/custom/incremental_strategies/models_delta/append_delta.sql new file mode 100644 index 000000000..517c2418d --- /dev/null +++ b/test/custom/incremental_strategies/models_delta/append_delta.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'append', + file_format = 'delta', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_delta/merge_no_key.sql b/test/custom/incremental_strategies/models_delta/merge_no_key.sql new file mode 100644 index 000000000..3039f74ec --- /dev/null +++ b/test/custom/incremental_strategies/models_delta/merge_no_key.sql @@ -0,0 +1,19 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_delta/merge_unique_key.sql b/test/custom/incremental_strategies/models_delta/merge_unique_key.sql new file mode 100644 index 000000000..30f5bc062 --- /dev/null +++ b/test/custom/incremental_strategies/models_delta/merge_unique_key.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', + unique_key = 'id', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql new file mode 100644 index 000000000..ebe6f2f00 --- /dev/null +++ b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql new file mode 100644 index 000000000..c0663d97b --- /dev/null +++ b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + partition_by = 'id', + file_format = 'parquet', +) }} + +{% if not is_incremental() %} + +select 1 as id, 'hello' as msg +union all +select 2 as id, 'goodbye' as msg + +{% else %} + +select 2 as id, 'yo' as msg +union all +select 3 as id, 'anyway' as msg + +{% endif %} diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py new file mode 100644 index 000000000..5880d2fcb --- /dev/null +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -0,0 +1,87 @@ +from test.custom.base import DBTSparkIntegrationTest + +class TestIncrementalStrategies(DBTSparkIntegrationTest): + @property + def schema(self): + return "incremental_strategies" + + @property + def models(self): + return "models" + + def run_and_test(self): + self.run_dbt(["seed"]) + self.run_dbt(["run"]) + self.assertTablesEqual("default_append", "expected_append") + +class TestDefaultAppend(TestIncrementalStrategies): + @use_profile("apache_spark") + def test_default_append_apache_spark(self): + self.run_and_test() + + @use_profile("databricks_cluster") + def test_default_append_databricks(self): + self.run_and_test() + +class TestInsertOverwrite(TestIncrementalStrategies): + @property + def models(self): + return "models_insert_overwrite" + + def run_and_test(self): + self.run_dbt(["seed"]) + self.run_dbt(["run"]) + self.assertTablesEqual("insert_overwrite_no_partitions", "expected_overwrite") + self.assertTablesEqual("insert_overwrite_partitions", "expected_upsert") + + @use_profile("apache_spark") + def test_insert_overwrite_apache_spark(self): + self.run_and_test() + + @use_profile("databricks_cluster") + def test_insert_overwrite_databricks(self): + self.run_and_test() + +class TestDeltaStrategies(TestIncrementalStrategies): + @property + def models(self): + return "models_delta" + + def run_and_test(self): + self.run_dbt(["seed"]) + self.run_dbt(["run"]) + self.assertTablesEqual("append_delta", "expected_append") + self.assertTablesEqual("merge_no_key", "expected_append") + self.assertTablesEqual("merge_unique_key", "expected_upsert") + + @use_profile("databricks_cluster") + def test_delta_strategies_databricks(self): + self.run_and_test() + +class TestBadStrategies(TestIncrementalStrategies): + @property + def models(self): + return "models_insert_overwrite" + + def run_and_test(self): + with self.assertRaises(dbt.exceptions.Exception) as exc: + self.run_dbt(["compile"]) + message = str(exc.exception) + self.assertIn("Invalid file format provided", message) + self.assertIn("Invalid incremental strategy provided", message) + + @use_profile("apache_spark") + def test_bad_strategies_apache_spark(self): + self.run_and_test() + + @use_profile("databricks_cluster") + def test_bad_strategies_databricks(self): + self.run_and_test() + +class TestBadStrategyWithEndpoint(TestInsertOverwrite): + @use_profile("databricks_sql_endpoint") + def run_and_test(self): + with self.assertRaises(dbt.exceptions.Exception) as exc: + self.run_dbt(["compile"], "--target", "odbc-sql-endpoint") + message = str(exc.exception) + self.assertIn("Invalid incremental strategy provided", message) From e08a23e4773f850c2a5847f088fd904882161679 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 3 Feb 2021 14:32:07 +0100 Subject: [PATCH 09/15] Try updating tox, circle (WIP) --- .circleci/config.yml | 20 +++++++++++++++++--- tox.ini | 3 +++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a0d9ac493..a44a14abe 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -60,7 +60,7 @@ jobs: - store_artifacts: path: ./logs - integration-spark-databricks-odbc: + integration-spark-databricks-odbc-cluster: &databricks-odbc environment: DBT_INVOCATION_ENV: circle ODBC_DRIVER: Simba # TODO: move env var to Docker image @@ -74,7 +74,18 @@ jobs: - checkout - run: name: Run integration tests - command: tox -e integration-spark-databricks-odbc-cluster,integration-spark-databricks-odbc-sql-endpoint + command: tox -e integration-spark-databricks-odbc-cluster + no_output_timeout: 1h + - store_artifacts: + path: ./logs + + integration-spark-databricks-odbc-endpoint: + <<: *databricks-odbc + steps: + - checkout + - run: + name: Run integration tests + command: tox -e integration-spark-databricks-odbc-sql-endpoint no_output_timeout: 1h - store_artifacts: path: ./logs @@ -90,6 +101,9 @@ workflows: - integration-spark-databricks-http: requires: - unit - - integration-spark-databricks-odbc: + - integration-spark-databricks-odbc-cluster: + requires: + - unit + - integration-spark-databricks-odbc-endpoint: requires: - unit diff --git a/tox.ini b/tox.ini index f865309f6..76b34f6db 100644 --- a/tox.ini +++ b/tox.ini @@ -30,6 +30,7 @@ deps = [testenv:integration-spark-databricks-odbc-cluster] basepython = python3 commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc-cluster.dbtspec' + /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster {posargs} -n4 test/custom/*' passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER deps = -r{toxinidir}/requirements.txt @@ -39,6 +40,7 @@ deps = [testenv:integration-spark-databricks-odbc-sql-endpoint] basepython = python3 commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc-sql-endpoint.dbtspec' + /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_sql_endpoint {posargs} -n4 test/custom/*' passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_ENDPOINT DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER deps = -r{toxinidir}/requirements.txt @@ -49,6 +51,7 @@ deps = [testenv:integration-spark-thrift] basepython = python3 commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-thrift.dbtspec' + /bin/bash -c '{envpython} -m pytest -v -m profile_apache_spark {posargs} -n4 test/custom/*' passenv = DBT_INVOCATION_ENV deps = -r{toxinidir}/requirements.txt From 9bbc61b1f2fb8ec7b7da0a17768c17c5d389b790 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Wed, 17 Feb 2021 09:38:51 -0500 Subject: [PATCH 10/15] get tests working --- dev_requirements.txt | 2 +- test/custom/base.py | 107 ++++++++++++++++-- test/custom/conftest.py | 10 ++ .../test_incremental_strategies.py | 45 +++++--- 4 files changed, 136 insertions(+), 28 deletions(-) create mode 100644 test/custom/conftest.py diff --git a/dev_requirements.txt b/dev_requirements.txt index 6180d33c3..bbcdc9d6d 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -10,6 +10,6 @@ pytest-xdist>=2.1.0,<3 flaky>=3.5.3,<4 # Test requirements -git+https://github.com/fishtown-analytics/dbt-adapter-tests.git@feature/add-integration-test-tools +git+https://github.com/fishtown-analytics/dbt-adapter-tests.git@33872d1cc0f936677dae091c3e0b49771c280514 sasl==0.2.1 thrift_sasl==0.4.1 diff --git a/test/custom/base.py b/test/custom/base.py index ed34e878c..d0426aed1 100644 --- a/test/custom/base.py +++ b/test/custom/base.py @@ -1,7 +1,69 @@ -from dbt_adapter_tests import DBTIntegrationTestBase, use_profile +import pytest +from functools import wraps +import os +from dbt_adapter_tests import DBTIntegrationTestBase + class DBTSparkIntegrationTest(DBTIntegrationTestBase): - + + def get_profile(self, adapter_type): + if adapter_type == 'apache_spark': + return self.apache_spark_profile() + elif adapter_type == 'databricks_cluster': + return self.databricks_cluster_profile() + elif adapter_type == 'databricks_sql_endpoint': + return self.databricks_sql_endpoint_profile() + else: + raise ValueError('invalid adapter type {}'.format(adapter_type)) + + @staticmethod + def _profile_from_test_name(test_name): + adapter_names = ('apache_spark', 'databricks_cluster', + 'databricks_sql_endpoint') + adapters_in_name = sum(x in test_name for x in adapter_names) + if adapters_in_name != 1: + raise ValueError( + 'test names must have exactly 1 profile choice embedded, {} has {}' + .format(test_name, adapters_in_name) + ) + + for adapter_name in adapter_names: + if adapter_name in test_name: + return adapter_name + + raise ValueError( + 'could not find adapter name in test name {}'.format(test_name) + ) + + def run_sql(self, query, fetch='None', kwargs=None, connection_name=None): + if connection_name is None: + connection_name = '__test' + + if query.strip() == "": + return + + sql = self.transform_sql(query, kwargs=kwargs) + + with self.get_connection(connection_name) as conn: + cursor = conn.handle.cursor() + try: + cursor.execute(sql) + if fetch == 'one': + return cursor.fetchall()[0] + elif fetch == 'all': + return cursor.fetchall() + else: + # we have to fetch. + cursor.fetchall() + except Exception as e: + conn.handle.rollback() + conn.transaction_open = False + print(sql) + print(e) + raise + else: + conn.transaction_open = False + def apache_spark_profile(self): return { 'config': { @@ -14,13 +76,13 @@ def apache_spark_profile(self): 'host': 'localhost', 'user': 'dbt', 'method': 'thrift', - 'port': '10000', - 'connect_retries': '5', - 'connect_timeout': '60', + 'port': 10000, + 'connect_retries': 5, + 'connect_timeout': 60, 'schema': self.unique_schema() }, + }, 'target': 'default2' - } } } @@ -40,11 +102,11 @@ def databricks_cluster_profile(self): 'port': 443, 'schema': self.unique_schema() }, + }, 'target': 'odbc' - } } } - + def databricks_sql_endpoint_profile(self): return { 'config': { @@ -61,7 +123,34 @@ def databricks_sql_endpoint_profile(self): 'port': 443, 'schema': self.unique_schema() }, + }, 'target': 'default2' - } } } + + +def use_profile(profile_name): + """A decorator to declare a test method as using a particular profile. + Handles both setting the nose attr and calling self.use_profile. + + Use like this: + + class TestSomething(DBIntegrationTest): + @use_profile('postgres') + def test_postgres_thing(self): + self.assertEqual(self.adapter_type, 'postgres') + + @use_profile('snowflake') + def test_snowflake_thing(self): + self.assertEqual(self.adapter_type, 'snowflake') + """ + def outer(wrapped): + @getattr(pytest.mark, 'profile_'+profile_name) + @wraps(wrapped) + def func(self, *args, **kwargs): + return wrapped(self, *args, **kwargs) + # sanity check at import time + assert DBTSparkIntegrationTest._profile_from_test_name( + wrapped.__name__) == profile_name + return func + return outer diff --git a/test/custom/conftest.py b/test/custom/conftest.py new file mode 100644 index 000000000..02248bae3 --- /dev/null +++ b/test/custom/conftest.py @@ -0,0 +1,10 @@ +def pytest_configure(config): + config.addinivalue_line( + "markers", "profile_databricks_cluster" + ) + config.addinivalue_line( + "markers", "profile_databricks_sql_endpoint" + ) + config.addinivalue_line( + "markers", "profile_apache_spark" + ) diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py index 5880d2fcb..5ad7a3f79 100644 --- a/test/custom/incremental_strategies/test_incremental_strategies.py +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -1,4 +1,6 @@ -from test.custom.base import DBTSparkIntegrationTest +from test.custom.base import DBTSparkIntegrationTest, use_profile +import dbt.exceptions + class TestIncrementalStrategies(DBTSparkIntegrationTest): @property @@ -14,73 +16,80 @@ def run_and_test(self): self.run_dbt(["run"]) self.assertTablesEqual("default_append", "expected_append") + class TestDefaultAppend(TestIncrementalStrategies): @use_profile("apache_spark") def test_default_append_apache_spark(self): self.run_and_test() - + @use_profile("databricks_cluster") - def test_default_append_databricks(self): + def test_default_append_databricks_cluster(self): self.run_and_test() + class TestInsertOverwrite(TestIncrementalStrategies): @property def models(self): return "models_insert_overwrite" - + def run_and_test(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) - self.assertTablesEqual("insert_overwrite_no_partitions", "expected_overwrite") - self.assertTablesEqual("insert_overwrite_partitions", "expected_upsert") - + self.assertTablesEqual( + "insert_overwrite_no_partitions", "expected_overwrite") + self.assertTablesEqual( + "insert_overwrite_partitions", "expected_upsert") + @use_profile("apache_spark") def test_insert_overwrite_apache_spark(self): self.run_and_test() - + @use_profile("databricks_cluster") - def test_insert_overwrite_databricks(self): + def test_insert_overwrite_databricks_cluster(self): self.run_and_test() + class TestDeltaStrategies(TestIncrementalStrategies): @property def models(self): return "models_delta" - + def run_and_test(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) self.assertTablesEqual("append_delta", "expected_append") self.assertTablesEqual("merge_no_key", "expected_append") self.assertTablesEqual("merge_unique_key", "expected_upsert") - + @use_profile("databricks_cluster") - def test_delta_strategies_databricks(self): + def test_delta_strategies_databricks_cluster(self): self.run_and_test() + class TestBadStrategies(TestIncrementalStrategies): @property def models(self): return "models_insert_overwrite" - + def run_and_test(self): with self.assertRaises(dbt.exceptions.Exception) as exc: self.run_dbt(["compile"]) message = str(exc.exception) self.assertIn("Invalid file format provided", message) self.assertIn("Invalid incremental strategy provided", message) - + @use_profile("apache_spark") def test_bad_strategies_apache_spark(self): self.run_and_test() - + @use_profile("databricks_cluster") - def test_bad_strategies_databricks(self): + def test_bad_strategies_databricks_cluster(self): self.run_and_test() - + + class TestBadStrategyWithEndpoint(TestInsertOverwrite): @use_profile("databricks_sql_endpoint") - def run_and_test(self): + def test_bad_strategies_databricks_sql_endpoint(self): with self.assertRaises(dbt.exceptions.Exception) as exc: self.run_dbt(["compile"], "--target", "odbc-sql-endpoint") message = str(exc.exception) From ab57d625063a7004b10f26695cf56f5337ed63db Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 18 Feb 2021 15:29:59 -0500 Subject: [PATCH 11/15] update integration tests --- .gitignore | 1 + test/custom/base.py | 7 ++ .../models/default_append.sql | 8 +- .../models_bad/bad_file_format.sql | 8 +- .../models_bad/bad_insert_overwrite_delta.sql | 8 +- .../models_bad/bad_merge_not_delta.sql | 8 +- .../models_bad/bad_strategy.sql | 8 +- .../models_delta/append_delta.sql | 8 +- .../models_delta/merge_no_key.sql | 8 +- .../models_delta/merge_unique_key.sql | 8 +- .../insert_overwrite_no_partitions.sql | 8 +- .../insert_overwrite_partitions.sql | 8 +- .../test_incremental_strategies.py | 78 ++++++++++++++----- 13 files changed, 108 insertions(+), 58 deletions(-) diff --git a/.gitignore b/.gitignore index d6f5c9d02..9caf202a6 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ dbt-integration-tests test/integration/.user.yml .DS_Store .vscode +*.log diff --git a/test/custom/base.py b/test/custom/base.py index d0426aed1..d2dc6dd78 100644 --- a/test/custom/base.py +++ b/test/custom/base.py @@ -2,6 +2,7 @@ from functools import wraps import os from dbt_adapter_tests import DBTIntegrationTestBase +import pyodbc class DBTSparkIntegrationTest(DBTIntegrationTestBase): @@ -55,6 +56,10 @@ def run_sql(self, query, fetch='None', kwargs=None, connection_name=None): else: # we have to fetch. cursor.fetchall() + except pyodbc.ProgrammingError as e: + # hacks for dropping schema + if "No results. Previous SQL was not a query." not in str(e): + raise e except Exception as e: conn.handle.rollback() conn.transaction_open = False @@ -99,6 +104,7 @@ def databricks_cluster_profile(self): 'host': os.getenv('DBT_DATABRICKS_HOST_NAME'), 'cluster': os.getenv('DBT_DATABRICKS_CLUSTER_NAME'), 'token': os.getenv('DBT_DATABRICKS_TOKEN'), + 'driver': os.getenv('ODBC_DRIVER'), 'port': 443, 'schema': self.unique_schema() }, @@ -120,6 +126,7 @@ def databricks_sql_endpoint_profile(self): 'host': os.getenv('DBT_DATABRICKS_HOST_NAME'), 'endpoint': os.getenv('DBT_DATABRICKS_ENDPOINT'), 'token': os.getenv('DBT_DATABRICKS_TOKEN'), + 'driver': os.getenv('ODBC_DRIVER'), 'port': 443, 'schema': self.unique_schema() }, diff --git a/test/custom/incremental_strategies/models/default_append.sql b/test/custom/incremental_strategies/models/default_append.sql index cef0d5ace..e2a10393f 100644 --- a/test/custom/incremental_strategies/models/default_append.sql +++ b/test/custom/incremental_strategies/models/default_append.sql @@ -4,14 +4,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_file_format.sql b/test/custom/incremental_strategies/models_bad/bad_file_format.sql index b0b6abd47..911ccbb88 100644 --- a/test/custom/incremental_strategies/models_bad/bad_file_format.sql +++ b/test/custom/incremental_strategies/models_bad/bad_file_format.sql @@ -5,14 +5,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql b/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql index f0d87b377..b7186e1b2 100644 --- a/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql +++ b/test/custom/incremental_strategies/models_bad/bad_insert_overwrite_delta.sql @@ -6,14 +6,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql b/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql index e07674c36..79a951110 100644 --- a/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql +++ b/test/custom/incremental_strategies/models_bad/bad_merge_not_delta.sql @@ -5,14 +5,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_bad/bad_strategy.sql b/test/custom/incremental_strategies/models_bad/bad_strategy.sql index 256f34846..72912f505 100644 --- a/test/custom/incremental_strategies/models_bad/bad_strategy.sql +++ b/test/custom/incremental_strategies/models_bad/bad_strategy.sql @@ -5,14 +5,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_delta/append_delta.sql b/test/custom/incremental_strategies/models_delta/append_delta.sql index 517c2418d..bfbd2512c 100644 --- a/test/custom/incremental_strategies/models_delta/append_delta.sql +++ b/test/custom/incremental_strategies/models_delta/append_delta.sql @@ -6,14 +6,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_delta/merge_no_key.sql b/test/custom/incremental_strategies/models_delta/merge_no_key.sql index 3039f74ec..35a71b1a0 100644 --- a/test/custom/incremental_strategies/models_delta/merge_no_key.sql +++ b/test/custom/incremental_strategies/models_delta/merge_no_key.sql @@ -6,14 +6,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_delta/merge_unique_key.sql b/test/custom/incremental_strategies/models_delta/merge_unique_key.sql index 30f5bc062..e8dfd37b9 100644 --- a/test/custom/incremental_strategies/models_delta/merge_unique_key.sql +++ b/test/custom/incremental_strategies/models_delta/merge_unique_key.sql @@ -7,14 +7,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql index ebe6f2f00..fcc142bd0 100644 --- a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql +++ b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql @@ -5,14 +5,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql index c0663d97b..cfe235ad2 100644 --- a/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql +++ b/test/custom/incremental_strategies/models_insert_overwrite/insert_overwrite_partitions.sql @@ -7,14 +7,14 @@ {% if not is_incremental() %} -select 1 as id, 'hello' as msg +select cast(1 as bigint) as id, 'hello' as msg union all -select 2 as id, 'goodbye' as msg +select cast(2 as bigint) as id, 'goodbye' as msg {% else %} -select 2 as id, 'yo' as msg +select cast(2 as bigint) as id, 'yo' as msg union all -select 3 as id, 'anyway' as msg +select cast(3 as bigint) as id, 'anyway' as msg {% endif %} diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py index 5ad7a3f79..8be088fbf 100644 --- a/test/custom/incremental_strategies/test_incremental_strategies.py +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -1,3 +1,4 @@ +from cProfile import run from test.custom.base import DBTSparkIntegrationTest, use_profile import dbt.exceptions @@ -11,13 +12,20 @@ def schema(self): def models(self): return "models" + @property + def project_config(self): + return { + 'seeds': { + 'quote_columns': False, + }, + } + def run_and_test(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) + self.run_dbt(["run"]) self.assertTablesEqual("default_append", "expected_append") - -class TestDefaultAppend(TestIncrementalStrategies): @use_profile("apache_spark") def test_default_append_apache_spark(self): self.run_and_test() @@ -27,18 +35,31 @@ def test_default_append_databricks_cluster(self): self.run_and_test() -class TestInsertOverwrite(TestIncrementalStrategies): +class TestInsertOverwrite(DBTSparkIntegrationTest): + @property + def schema(self): + return "incremental_strategies" + @property def models(self): return "models_insert_overwrite" + @property + def project_config(self): + return { + 'seeds': { + 'quote_columns': False, + }, + } + def run_and_test(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) + self.run_dbt(["run"]) self.assertTablesEqual( "insert_overwrite_no_partitions", "expected_overwrite") self.assertTablesEqual( - "insert_overwrite_partitions", "expected_upsert") + "insert_overwrite_partitions", "expected_overwrite") @use_profile("apache_spark") def test_insert_overwrite_apache_spark(self): @@ -49,14 +70,27 @@ def test_insert_overwrite_databricks_cluster(self): self.run_and_test() -class TestDeltaStrategies(TestIncrementalStrategies): +class TestDeltaStrategies(DBTSparkIntegrationTest): + @property + def schema(self): + return "incremental_strategies" + @property def models(self): return "models_delta" + @property + def project_config(self): + return { + 'seeds': { + 'quote_columns': False, + }, + } + def run_and_test(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) + self.run_dbt(["run"]) self.assertTablesEqual("append_delta", "expected_append") self.assertTablesEqual("merge_no_key", "expected_append") self.assertTablesEqual("merge_unique_key", "expected_upsert") @@ -66,17 +100,30 @@ def test_delta_strategies_databricks_cluster(self): self.run_and_test() -class TestBadStrategies(TestIncrementalStrategies): +class TestBadStrategies(DBTSparkIntegrationTest): + @property + def schema(self): + return "incremental_strategies" + + @property + def project_config(self): + return { + 'seeds': { + 'quote_columns': False, + }, + } + @property def models(self): - return "models_insert_overwrite" + return "models_bad" def run_and_test(self): - with self.assertRaises(dbt.exceptions.Exception) as exc: - self.run_dbt(["compile"]) - message = str(exc.exception) - self.assertIn("Invalid file format provided", message) - self.assertIn("Invalid incremental strategy provided", message) + self.run_dbt(["seed"]) + results = self.run_dbt(["run"], expect_pass=False) + # assert all models fail with co + for result in results: + self.assertEqual("error", result.status) + self.assertIn("Compilation Error in model", result.message) @use_profile("apache_spark") def test_bad_strategies_apache_spark(self): @@ -86,11 +133,6 @@ def test_bad_strategies_apache_spark(self): def test_bad_strategies_databricks_cluster(self): self.run_and_test() - -class TestBadStrategyWithEndpoint(TestInsertOverwrite): @use_profile("databricks_sql_endpoint") def test_bad_strategies_databricks_sql_endpoint(self): - with self.assertRaises(dbt.exceptions.Exception) as exc: - self.run_dbt(["compile"], "--target", "odbc-sql-endpoint") - message = str(exc.exception) - self.assertIn("Invalid incremental strategy provided", message) + self.run_and_test() From a12f74d47ae0937d14873ff45d1bed29a9c03e29 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 19 Feb 2021 12:06:53 +0100 Subject: [PATCH 12/15] Fix assertTablesEqual mismatch --- .../incremental_strategies/test_incremental_strategies.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py index 8be088fbf..730320cdb 100644 --- a/test/custom/incremental_strategies/test_incremental_strategies.py +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -59,7 +59,7 @@ def run_and_test(self): self.assertTablesEqual( "insert_overwrite_no_partitions", "expected_overwrite") self.assertTablesEqual( - "insert_overwrite_partitions", "expected_overwrite") + "insert_overwrite_partitions", "expected_upsert") @use_profile("apache_spark") def test_insert_overwrite_apache_spark(self): @@ -120,7 +120,7 @@ def models(self): def run_and_test(self): self.run_dbt(["seed"]) results = self.run_dbt(["run"], expect_pass=False) - # assert all models fail with co + # assert all models fail with compilation errors for result in results: self.assertEqual("error", result.status) self.assertIn("Compilation Error in model", result.message) From 43ab587ba08e8f288c86196a88efd08bc454a5e2 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 19 Feb 2021 12:19:58 +0100 Subject: [PATCH 13/15] Concise test defs. Local tests first --- .circleci/config.yml | 3 + .../test_incremental_strategies.py | 65 ++++--------------- 2 files changed, 17 insertions(+), 51 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a44a14abe..f00d3d7d5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -101,9 +101,12 @@ workflows: - integration-spark-databricks-http: requires: - unit + - integration-spark-thrift - integration-spark-databricks-odbc-cluster: requires: - unit + - integration-spark-thrift - integration-spark-databricks-odbc-endpoint: requires: - unit + - integration-spark-thrift diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py index 730320cdb..0de1bd057 100644 --- a/test/custom/incremental_strategies/test_incremental_strategies.py +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -8,10 +8,6 @@ class TestIncrementalStrategies(DBTSparkIntegrationTest): def schema(self): return "incremental_strategies" - @property - def models(self): - return "models" - @property def project_config(self): return { @@ -20,10 +16,18 @@ def project_config(self): }, } - def run_and_test(self): + def seed_and_run_twice(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) self.run_dbt(["run"]) + +class TestDefaultAppend(TestIncrementalStrategies): + @property + def models(self): + return "models" + + def run_and_test(self): + self.seed_and_run_twice() self.assertTablesEqual("default_append", "expected_append") @use_profile("apache_spark") @@ -35,27 +39,13 @@ def test_default_append_databricks_cluster(self): self.run_and_test() -class TestInsertOverwrite(DBTSparkIntegrationTest): - @property - def schema(self): - return "incremental_strategies" - +class TestInsertOverwrite(TestIncrementalStrategies): @property def models(self): return "models_insert_overwrite" - @property - def project_config(self): - return { - 'seeds': { - 'quote_columns': False, - }, - } - def run_and_test(self): - self.run_dbt(["seed"]) - self.run_dbt(["run"]) - self.run_dbt(["run"]) + self.seed_and_run_twice() self.assertTablesEqual( "insert_overwrite_no_partitions", "expected_overwrite") self.assertTablesEqual( @@ -70,27 +60,13 @@ def test_insert_overwrite_databricks_cluster(self): self.run_and_test() -class TestDeltaStrategies(DBTSparkIntegrationTest): - @property - def schema(self): - return "incremental_strategies" - +class TestDeltaStrategies(TestIncrementalStrategies): @property def models(self): return "models_delta" - @property - def project_config(self): - return { - 'seeds': { - 'quote_columns': False, - }, - } - def run_and_test(self): - self.run_dbt(["seed"]) - self.run_dbt(["run"]) - self.run_dbt(["run"]) + self.seed_and_run_twice() self.assertTablesEqual("append_delta", "expected_append") self.assertTablesEqual("merge_no_key", "expected_append") self.assertTablesEqual("merge_unique_key", "expected_upsert") @@ -100,25 +76,12 @@ def test_delta_strategies_databricks_cluster(self): self.run_and_test() -class TestBadStrategies(DBTSparkIntegrationTest): - @property - def schema(self): - return "incremental_strategies" - - @property - def project_config(self): - return { - 'seeds': { - 'quote_columns': False, - }, - } - +class TestBadStrategies(TestIncrementalStrategies): @property def models(self): return "models_bad" def run_and_test(self): - self.run_dbt(["seed"]) results = self.run_dbt(["run"], expect_pass=False) # assert all models fail with compilation errors for result in results: From ac35028a7a1668a958fca701ffd20bd9db24e022 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 19 Feb 2021 14:26:05 +0100 Subject: [PATCH 14/15] Fixups, document spark config setting --- .circleci/config.yml | 3 --- README.md | 2 +- .../incremental_strategies/test_incremental_strategies.py | 3 ++- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f00d3d7d5..99154fb64 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,13 +100,10 @@ workflows: - unit - integration-spark-databricks-http: requires: - - unit - integration-spark-thrift - integration-spark-databricks-odbc-cluster: requires: - - unit - integration-spark-thrift - integration-spark-databricks-odbc-endpoint: requires: - - unit - integration-spark-thrift diff --git a/README.md b/README.md index 09e716b5c..8841100fd 100644 --- a/README.md +++ b/README.md @@ -169,7 +169,7 @@ The following configurations can be supplied to models run with the dbt-spark pl dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations: - `append` (default): Insert new records without updating or overwriting any existing data. -- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization instead.] +- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For dynamic partition replacement with `method: odbc` + Databricks `cluster`, you must you **must** include `set spark.sql.sources.partitionOverwriteMode DYNAMIC` in the [cluster SparkConfig](https://docs.databricks.com/clusters/configure.html#spark-config). For atomic replacement of Delta tables, use the `table` materialization instead.] - `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.] Examples: diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py index 0de1bd057..4d13a7708 100644 --- a/test/custom/incremental_strategies/test_incremental_strategies.py +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -20,7 +20,8 @@ def seed_and_run_twice(self): self.run_dbt(["seed"]) self.run_dbt(["run"]) self.run_dbt(["run"]) - + + class TestDefaultAppend(TestIncrementalStrategies): @property def models(self): From b8c7d77ab8d8f758d97d33d3c344f8fb5afb95c3 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 19 Feb 2021 14:54:09 +0100 Subject: [PATCH 15/15] Update README, Changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b7e79c90..500dd4f1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## dbt-spark 0.19.0 (Release TBD) +### Breaking changes +- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) + ### Fixes - Capture hard-deleted records in snapshot merge, when `invalidate_hard_deletes` config is set ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/143), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/144)) @@ -7,7 +10,6 @@ ### Breaking changes - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) -- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) ### Under the hood - Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history.