Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial attempt to get spark working with new testing framework #299

Merged
merged 4 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## dbt-spark 1.1.0 (TBD)

### Features
- Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279))

### Under the hood
- Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299))

### Contributors
- [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279))

Expand Down
3 changes: 2 additions & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter

freezegun==0.3.9
pytest==6.0.2
Expand Down
10 changes: 10 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[pytest]
filterwarnings =
ignore:.*'soft_unicode' has been renamed to 'soft_str'*:DeprecationWarning
ignore:unclosed file .*:ResourceWarning
env_files =
test.env
testpaths =
tests/unit
tests/integration
tests/functional
90 changes: 90 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pytest
import os

pytest_plugins = ["dbt.tests.fixtures.project"]


def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="apache_spark", type=str)


# Using @pytest.mark.skip_adapter('apache_spark') uses the 'skip_by_adapter_type'
# autouse fixture below
def pytest_configure(config):
config.addinivalue_line(
"markers",
"skip_profile(profile): skip test for the given profile",
)


@pytest.fixture(scope="session")
def dbt_profile_target(request):
profile_type = request.config.getoption("--profile")
if profile_type == "databricks_cluster":
target = databricks_cluster_target()
elif profile_type == "databricks_sql_endpoint":
target = databricks_sql_endpoint_target()
elif profile_type == "apache_spark":
target = apache_spark_target()
elif profile_type == "databricks_http_cluster":
target = databricks_http_cluster_target()
else:
raise ValueError(f"Invalid profile type '{profile_type}'")
return target


def apache_spark_target():
return {
"type": "spark",
"host": "localhost",
"user": "dbt",
"method": "thrift",
"port": 10000,
"connect_retries": 5,
"connect_timeout": 60,
"retry_all": True,
}


def databricks_cluster_target():
return {
"type": "spark",
"method": "odbc",
"host": os.getenv("DBT_DATABRICKS_HOST_NAME"),
"cluster": os.getenv("DBT_DATABRICKS_CLUSTER_NAME"),
"token": os.getenv("DBT_DATABRICKS_TOKEN"),
"driver": os.getenv("ODBC_DRIVER"),
"port": 443,
}


def databricks_sql_endpoint_target():
return {
"type": "spark",
"method": "odbc",
"host": os.getenv("DBT_DATABRICKS_HOST_NAME"),
"endpoint": os.getenv("DBT_DATABRICKS_ENDPOINT"),
"token": os.getenv("DBT_DATABRICKS_TOKEN"),
"driver": os.getenv("ODBC_DRIVER"),
"port": 443,
}


def databricks_http_cluster_target():
return {
"type": "spark",
"host": os.getenv('DBT_DATABRICKS_HOST_NAME'),
"cluster": os.getenv('DBT_DATABRICKS_CLUSTER_NAME'),
"token": os.getenv('DBT_DATABRICKS_TOKEN'),
"method": "http",
"port": 443,
"connect_retries": 5,
"connect_timeout": 60,
}

@pytest.fixture(autouse=True)
def skip_by_profile_type(request):
profile_type = request.config.getoption("--profile")
if request.node.get_closest_marker("skip_profile"):
if request.node.get_closest_marker("skip_profile").args[0] == profile_type:
pytest.skip("skipped on '{profile_type}' profile")
79 changes: 79 additions & 0 deletions tests/functional/adapter/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import pytest

from dbt.tests.adapter.basic.test_base import BaseSimpleMaterializations
from dbt.tests.adapter.basic.test_singular_tests import BaseSingularTests
from dbt.tests.adapter.basic.test_singular_tests_ephemeral import (
BaseSingularTestsEphemeral,
)
from dbt.tests.adapter.basic.test_empty import BaseEmpty
from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral
from dbt.tests.adapter.basic.test_incremental import BaseIncremental
from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests
from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols
from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp


@pytest.mark.skip_profile('databricks_sql_endpoint')
class TestSimpleMaterializationsSpark(BaseSimpleMaterializations):
pass


class TestSingularTestsSpark(BaseSingularTests):
pass


# The local cluster currently tests on spark 2.x, which does not support this
# if we upgrade it to 3.x, we can enable this test
@pytest.mark.skip_profile('apache_spark')
class TestSingularTestsEphemeralSpark(BaseSingularTestsEphemeral):
pass


class TestEmptySpark(BaseEmpty):
pass


@pytest.mark.skip_profile('databricks_sql_endpoint')
class TestEphemeralSpark(BaseEphemeral):
pass


@pytest.mark.skip_profile('databricks_sql_endpoint')
class TestIncrementalSpark(BaseIncremental):
pass


class TestGenericTestsSpark(BaseGenericTests):
pass


# These tests were not enabled in the dbtspec files, so skipping here.
# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource
@pytest.mark.skip_profile('apache_spark')
class TestSnapshotCheckColsSpark(BaseSnapshotCheckCols):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seeds": {
"+file_format": "delta",
},
"snapshots": {
"+file_format": "delta",
}
}


#hese tests were not enabled in the dbtspec files, so skipping here.
# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource
@pytest.mark.skip_profile('apache_spark')
class TestSnapshotTimestampSpark(BaseSnapshotTimestamp):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"seeds": {
"+file_format": "delta",
},
"snapshots": {
"+file_format": "delta",
}
}
2 changes: 2 additions & 0 deletions tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ def __init__(self):


class TestArgs:
__test__ = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this line fix the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pytest test collection gathers up all classes that start with 'Test', so the test = False marks this class as "not a test".


def __init__(self, kwargs):
self.which = 'run'
self.single_threaded = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pathlib import Path


TestResults = namedtuple(
'TestResults',
ResultHolder = namedtuple(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha I actually suggested Matt to convert this to the same name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that and used the same name :). Pytest was also trying to collect "TestResults" as a test class too.

'ResultHolder',
['seed_count', 'model_count', 'seed_rows', 'inc_test_model_count',
'opt_model_count', 'relation'],
)
Expand Down Expand Up @@ -95,7 +95,7 @@ def test_scenario_correctness(self, expected_fields, test_case_fields):
def stub_expected_fields(
self, relation, seed_rows, opt_model_count=None
):
return TestResults(
return ResultHolder(
seed_count=1, model_count=1, seed_rows=seed_rows,
inc_test_model_count=1, opt_model_count=opt_model_count,
relation=relation
Expand Down Expand Up @@ -134,7 +134,7 @@ def test__databricks_sql_endpoint_no_unique_keys(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -152,7 +152,7 @@ def test__databricks_cluster_no_unique_keys(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -173,7 +173,7 @@ def test__databricks_sql_endpoint_empty_str_unique_key(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -191,7 +191,7 @@ def test__databricks_cluster_empty_str_unique_key(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -210,7 +210,7 @@ def test__databricks_sql_endpoint_one_unique_key(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -230,7 +230,7 @@ def test__databricks_cluster_one_unique_key(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand Down Expand Up @@ -274,7 +274,7 @@ def test__databricks_sql_endpoint_empty_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -292,7 +292,7 @@ def test__databricks_cluster_empty_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -311,7 +311,7 @@ def test__databricks_sql_endpoint_unary_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -331,7 +331,7 @@ def test__databricks_cluster_unary_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -351,7 +351,7 @@ def test__databricks_sql_endpoint_duplicated_unary_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -371,7 +371,7 @@ def test__databricks_cluster_duplicated_unary_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -391,7 +391,7 @@ def test__databricks_sql_endpoint_trinary_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -411,7 +411,7 @@ def test__databricks_cluster_trinary_unique_key_list(self):
expected_fields = self.stub_expected_fields(
relation=expected_model, seed_rows=seed_rows, opt_model_count=1
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=self.update_incremental_model(expected_model),
relation=incremental_model
Expand All @@ -431,7 +431,7 @@ def test__databricks_sql_endpoint_trinary_unique_key_list_no_update(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand All @@ -450,7 +450,7 @@ def test__databricks_cluster_trinary_unique_key_list_no_update(self):
expected_fields = self.stub_expected_fields(
relation=seed, seed_rows=seed_rows
)
test_case_fields = TestResults(
test_case_fields = ResultHolder(
*self.setup_test(seed, incremental_model, update_sql_file),
opt_model_count=None, relation=incremental_model
)
Expand Down Expand Up @@ -478,4 +478,4 @@ def test__databricks_cluster_bad_unique_key_list(self):

self.assertEqual(status, RunStatus.Error)
self.assertTrue("thisisnotacolumn" in exc)


Loading