Skip to content

Commit

Permalink
Try adding custom integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Feb 3, 2021
1 parent bc18f80 commit af77fd8
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ pytest-xdist>=2.1.0,<3
flaky>=3.5.3,<4

# Test requirements
pytest-dbt-adapter==0.4.0
git+https://github.com/fishtown-analytics/dbt-adapter-tests.git@feature/add-integration-test-tools
sasl==0.2.1
thrift_sasl==0.4.1
Empty file added test/custom/__init__.py
Empty file.
67 changes: 67 additions & 0 deletions test/custom/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from dbt_adapter_tests import DBTIntegrationTestBase, use_profile

class DBTSparkIntegrationTest(DBTIntegrationTestBase):

def apache_spark_profile(self):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'default2': {
'type': 'spark',
'host': 'localhost',
'user': 'dbt',
'method': 'thrift',
'port': '10000',
'connect_retries': '5',
'connect_timeout': '60',
'schema': self.unique_schema()
},
'target': 'default2'
}
}
}

def databricks_cluster_profile(self):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'odbc': {
'type': 'spark',
'method': 'odbc',
'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
'cluster': os.getenv('DBT_DATABRICKS_CLUSTER_NAME'),
'token': os.getenv('DBT_DATABRICKS_TOKEN'),
'port': 443,
'schema': self.unique_schema()
},
'target': 'odbc'
}
}
}

def databricks_sql_endpoint_profile(self):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'default2': {
'type': 'spark',
'method': 'odbc',
'host': os.getenv('DBT_DATABRICKS_HOST_NAME'),
'endpoint': os.getenv('DBT_DATABRICKS_ENDPOINT'),
'token': os.getenv('DBT_DATABRICKS_TOKEN'),
'port': 443,
'schema': self.unique_schema()
},
'target': 'default2'
}
}
}
5 changes: 5 additions & 0 deletions test/custom/incremental_strategies/data/expected_append.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,msg
1,hello
2,goodbye
2,yo
3,anyway
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id,msg
2,yo
3,anyway
4 changes: 4 additions & 0 deletions test/custom/incremental_strategies/data/expected_upsert.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,msg
1,hello
2,yo
3,anyway
17 changes: 17 additions & 0 deletions test/custom/incremental_strategies/models/default_append.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{ config(
materialized = 'incremental',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
18 changes: 18 additions & 0 deletions test/custom/incremental_strategies/models_bad/bad_file_format.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
file_format = 'something_else',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
file_format = 'delta',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
18 changes: 18 additions & 0 deletions test/custom/incremental_strategies/models_bad/bad_strategy.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'something_else',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
19 changes: 19 additions & 0 deletions test/custom/incremental_strategies/models_delta/append_delta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
file_format = 'delta',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
19 changes: 19 additions & 0 deletions test/custom/incremental_strategies/models_delta/merge_no_key.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'delta',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'delta',
unique_key = 'id',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id',
file_format = 'parquet',
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg
union all
select 2 as id, 'goodbye' as msg

{% else %}

select 2 as id, 'yo' as msg
union all
select 3 as id, 'anyway' as msg

{% endif %}
87 changes: 87 additions & 0 deletions test/custom/incremental_strategies/test_incremental_strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from test.custom.base import DBTSparkIntegrationTest

class TestIncrementalStrategies(DBTSparkIntegrationTest):
@property
def schema(self):
return "incremental_strategies"

@property
def models(self):
return "models"

def run_and_test(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
self.assertTablesEqual("default_append", "expected_append")

class TestDefaultAppend(TestIncrementalStrategies):
@use_profile("apache_spark")
def test_default_append_apache_spark(self):
self.run_and_test()

@use_profile("databricks_cluster")
def test_default_append_databricks(self):
self.run_and_test()

class TestInsertOverwrite(TestIncrementalStrategies):
@property
def models(self):
return "models_insert_overwrite"

def run_and_test(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
self.assertTablesEqual("insert_overwrite_no_partitions", "expected_overwrite")
self.assertTablesEqual("insert_overwrite_partitions", "expected_upsert")

@use_profile("apache_spark")
def test_insert_overwrite_apache_spark(self):
self.run_and_test()

@use_profile("databricks_cluster")
def test_insert_overwrite_databricks(self):
self.run_and_test()

class TestDeltaStrategies(TestIncrementalStrategies):
@property
def models(self):
return "models_delta"

def run_and_test(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
self.assertTablesEqual("append_delta", "expected_append")
self.assertTablesEqual("merge_no_key", "expected_append")
self.assertTablesEqual("merge_unique_key", "expected_upsert")

@use_profile("databricks_cluster")
def test_delta_strategies_databricks(self):
self.run_and_test()

class TestBadStrategies(TestIncrementalStrategies):
@property
def models(self):
return "models_insert_overwrite"

def run_and_test(self):
with self.assertRaises(dbt.exceptions.Exception) as exc:
self.run_dbt(["compile"])
message = str(exc.exception)
self.assertIn("Invalid file format provided", message)
self.assertIn("Invalid incremental strategy provided", message)

@use_profile("apache_spark")
def test_bad_strategies_apache_spark(self):
self.run_and_test()

@use_profile("databricks_cluster")
def test_bad_strategies_databricks(self):
self.run_and_test()

class TestBadStrategyWithEndpoint(TestInsertOverwrite):
@use_profile("databricks_sql_endpoint")
def run_and_test(self):
with self.assertRaises(dbt.exceptions.Exception) as exc:
self.run_dbt(["compile"], "--target", "odbc-sql-endpoint")
message = str(exc.exception)
self.assertIn("Invalid incremental strategy provided", message)

0 comments on commit af77fd8

Please sign in to comment.