Skip to content

Commit

Permalink
Add integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Oct 14, 2021
1 parent df2d83d commit 8a25095
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ test/integration/.user.yml
.DS_Store
.vscode
*.log
logs/
34 changes: 19 additions & 15 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -198,36 +198,40 @@

{% macro spark__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_relation = base_relation.incorporate(
path={
"identifier": tmp_identifier,
"schema": None
}) -%}
{% set tmp_relation = base_relation.incorporate(path = {
"identifier": tmp_identifier,
"schema": None
}) -%}

{% do return(tmp_relation) %}
{% endmacro %}


{% macro spark__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} alter column {{ column_name }} type {{ new_column_type }};
{% endcall %}
{% endmacro %}


{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% set engine_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %}
{{ exceptions.raise_compiler_error(engine_name + 'does not support dropping columns from tables') }}
{% endif %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}
{% if remove_columns is not none %}
{{ exceptions.raise_not_implemented(
'Delta does not support removing columns'
) }}
{% endif %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% if add_columns %} add columns {% endif %}
{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}{{ ',' if remove_columns | length > 0 }}

{% for column in remove_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}
Expand Down
3 changes: 1 addition & 2 deletions test/custom/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,10 @@ def databricks_cluster_profile(self):
'outputs': {
'cluster': {
'type': 'spark',
'method': 'odbc',
'method': 'http',
'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
'cluster': os.getenv('DBT_DATABRICKS_CLUSTER_NAME'),
'token': os.getenv('DBT_DATABRICKS_TOKEN'),
'driver': os.getenv('ODBC_DRIVER'),
'port': 443,
'schema': self.unique_schema()
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{{
config(
materialized='incremental',
on_schema_change='append_new_columns'
)
}}

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{
config(materialized='table')
}}

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

with source_data as (

select * from {{ ref('model_a') }}

)

select id
,cast(field1 as {{string_type}}) as field1
,cast(field2 as {{string_type}}) as field2
,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3
,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4

from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized='incremental',
on_schema_change='fail'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id, field1, field2 FROM source_data

{% else %}

SELECT id, field1, field3 FROm source_data

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized='incremental',
on_schema_change='ignore'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id, field1, field2 FROM source_data LIMIT 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

select id
,field1
,field2

from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{
config(
materialized='incremental',
on_schema_change='sync_all_columns'

)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3, -- to validate new fields
cast(field4 as {{string_type}}) AS field4 -- to validate new fields

FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

select id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2

from source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

select id
,cast(field1 as {{string_type}}) as field1
--,field2
,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3
,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4

from source_data
order by id
22 changes: 22 additions & 0 deletions test/custom/incremental_on_schema_change/models/model_a.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{
config(materialized='table')
}}

with source_data as (

select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4
union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4
union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4
union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4
union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4
union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4

)

select id
,field1
,field2
,field3
,field4

from source_data
130 changes: 130 additions & 0 deletions test/custom/incremental_on_schema_change/test_incremental_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from cProfile import run
from test.custom.base import DBTSparkIntegrationTest, use_profile
import dbt.exceptions


class TestIncrementalOnSchemaChange(DBTSparkIntegrationTest):
@property
def schema(self):
return "incremental_on_schema_change"

@property
def models(self):
return "models"

@property
def project_config(self):
return {
"config-version": 2,
"test-paths": ["tests"]
}

def run_twice_and_assert(
self, include, compare_source, compare_target
):

# dbt run (twice)
run_args = ['run']
if include:
run_args.extend(('--models', include))
results_one = self.run_dbt(run_args)
results_two = self.run_dbt(run_args)

self.assertEqual(len(results_one), 3)
self.assertEqual(len(results_two), 3)

self.assertTablesEqual(compare_source, compare_target)

def run_incremental_ignore(self):
select = 'model_a incremental_ignore incremental_ignore_target'
compare_source = 'incremental_ignore'
compare_target = 'incremental_ignore_target'
self.run_twice_and_assert(select, compare_source, compare_target)

def run_incremental_append_new_columns(self):
select = 'model_a incremental_append_new_columns incremental_append_new_columns_target'
compare_source = 'incremental_append_new_columns'
compare_target = 'incremental_append_new_columns_target'
self.run_twice_and_assert(select, compare_source, compare_target)

def run_incremental_fail_on_schema_change(self):
select = 'model_a incremental_fail'
results_one = self.run_dbt(['run', '--models', select, '--full-refresh'])
results_two = self.run_dbt(['run', '--models', select], expect_pass = False)
self.assertIn('Compilation Error', results_two[1].message)

def run_incremental_sync_all_columns(self):
# this doesn't work on Delta today
select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target'
compare_source = 'incremental_sync_all_columns'
compare_target = 'incremental_sync_all_columns_target'
results_one = self.run_dbt(['run', '--models', select, '--full-refresh'])
results_two = self.run_dbt(['run', '--models', select], expect_pass = False)
self.assertIn('Compilation Error', results_two[1].message)

class TestApacheSparkOnSchemaChange(TestIncrementalOnSchemaChange):

@property
def project_config(self):
return {
"config-version": 2,
"test-paths": ["tests"],
"models": {
"+partition_by": "id",
}
}

# only 'ignore' and 'fail' are supported

@use_profile('apache_spark')
def test__apache_spark__run_incremental_ignore(self):
self.run_incremental_ignore()

@use_profile('apache_spark')
def test__apache_spark__run_incremental_fail_on_schema_change(self):
self.run_incremental_fail_on_schema_change()

class TestDeltaOnSchemaChange(TestIncrementalOnSchemaChange):
@property
def project_config(self):
return {
"config-version": 2,
"test-paths": ["tests"],
"models": {
"+file_format": "delta",
"+incremental_strategy": "merge",
"+unique_key": "id",
}
}

@use_profile('databricks_cluster')
def test__databricks_cluster__run_incremental_ignore(self):
self.run_incremental_ignore()

@use_profile('databricks_cluster')
def test__databricks_cluster__run_incremental_fail_on_schema_change(self):
self.run_incremental_fail_on_schema_change()

@use_profile('databricks_cluster')
def test__databricks_cluster__run_incremental_append_new_columns(self):
self.run_incremental_append_new_columns()

@use_profile('databricks_cluster')
def test__databricks_cluster__run_incremental_sync_all_columns(self):
self.run_incremental_sync_all_columns()

@use_profile('databricks_sql_endpoint')
def test__databricks_sql_endpoint__run_incremental_ignore(self):
self.run_incremental_ignore()

@use_profile('databricks_sql_endpoint')
def test__databricks_sql_endpoint__run_incremental_fail_on_schema_change(self):
self.run_incremental_fail_on_schema_change()

@use_profile('databricks_sql_endpoint')
def test__databricks_sql_endpoint__run_incremental_append_new_columns(self):
self.run_incremental_append_new_columns()

@use_profile('databricks_sql_endpoint')
def test__databricks_sql_endpoint__run_incremental_sync_all_columns(self):
self.run_incremental_sync_all_columns()

0 comments on commit 8a25095

Please sign in to comment.