Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unit testing incremental models #8891

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231101-101845.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support unit testing incremental models
time: 2023-11-01T10:18:45.341781-04:00
custom:
Author: michelleark
Issue: "8422"
8 changes: 8 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,14 @@ def env_var(self, var: str, default: Optional[str] = None) -> str:
else:
return super().env_var(var, default)

@contextproperty()
def this(self) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere "this" is a Relation. Here it's just a model name plus the ephemeral_prefix. Might be confusing. Maybe call it something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can we can call it anything else in the provider specifically, because need the {{ this }} call in the jinja context to resolve to the CTE name of the fixture we create from this provided in the unit test yaml.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Okay :)

if self.model.this_input_node_unique_id:
this_node = self.manifest.expect(self.model.this_input_node_unique_id)
self.model.set_cte(this_node.unique_id, None) # type: ignore
return self.adapter.Relation.add_ephemeral_prefix(this_node.name)
return None


# This is called by '_context_for', used in 'render_with_context'
def generate_parser_model_context(
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,8 @@ def test_node_type(self):
@dataclass
class UnitTestNode(CompiledNode):
resource_type: NodeType = field(metadata={"restrict": [NodeType.Unit]})
attached_node: Optional[str] = None
tested_node_unique_id: Optional[str] = None
this_input_node_unique_id: Optional[str] = None
overrides: Optional[UnitTestOverrides] = None
config: UnitTestNodeConfig = field(default_factory=UnitTestNodeConfig)

Expand Down
6 changes: 6 additions & 0 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,12 @@
super().__init__(msg=msg)


class InvalidUnitTestGivenInput(ParsingError):
def __init__(self, input: str) -> None:
msg = f"Unit test given inputs must be either a 'ref', 'source' or 'this' call. Got: '{input}'."
super().__init__(msg=msg)

Check warning on line 1226 in core/dbt/exceptions.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/exceptions.py#L1225-L1226

Added lines #L1225 - L1226 were not covered by tests


class SameKeyNestedError(CompilationError):
def __init__(self) -> None:
msg = "Test cannot have the same key at the top-level and in config"
Expand Down
92 changes: 56 additions & 36 deletions core/dbt/parser/unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
from dbt.contracts.graph.nodes import (
ModelNode,
UnitTestNode,
RefArgs,
UnitTestDefinition,
DependsOn,
UnitTestConfig,
)
from dbt.contracts.graph.unparsed import UnparsedUnitTestSuite
from dbt.exceptions import ParsingError
from dbt.exceptions import ParsingError, InvalidUnitTestGivenInput
from dbt.graph import UniqueId
from dbt.node_types import NodeType
from dbt.parser.schemas import (
Expand All @@ -28,7 +27,7 @@
ParseResult,
)
from dbt.utils import get_pseudo_test_path
from dbt_extractor import py_extract_from_source # type: ignore
from dbt_extractor import py_extract_from_source, ExtractionError # type: ignore


class UnitTestManifestLoader:
Expand All @@ -49,10 +48,11 @@
def parse_unit_test_case(self, test_case: UnitTestDefinition):
package_name = self.root_project.project_name

# Create unit test node based on the "actual" tested node
actual_node = self.manifest.ref_lookup.perform_lookup(
# Create unit test node based on the node being tested
tested_node = self.manifest.ref_lookup.perform_lookup(
f"model.{package_name}.{test_case.model}", self.manifest
)
assert isinstance(tested_node, ModelNode)

# Create UnitTestNode based on model being tested. Since selection has
# already been done, we don't have to care about fields that are necessary
Expand All @@ -69,13 +69,13 @@
config=UnitTestNodeConfig(
materialized="unit", expected_rows=test_case.expect.get_rows()
),
raw_code=actual_node.raw_code,
database=actual_node.database,
schema=actual_node.schema,
raw_code=tested_node.raw_code,
database=tested_node.database,
schema=tested_node.schema,
alias=name,
fqn=test_case.unique_id.split("."),
checksum=FileHash.empty(),
attached_node=actual_node.unique_id,
tested_node_unique_id=tested_node.unique_id,
overrides=test_case.overrides,
)

Expand Down Expand Up @@ -106,7 +106,7 @@
# input models substituting for the same input ref'd model.
for given in test_case.given:
# extract the original_input_node from the ref in the "input" key of the given list
original_input_node = self._get_original_input_node(given.input)
original_input_node = self._get_original_input_node(given.input, tested_node)

original_input_node_columns = None
if (
Expand All @@ -117,11 +117,13 @@
column.name: column.data_type for column in original_input_node.columns
}

# TODO: package_name?
input_name = f"{test_case.model}__{test_case.name}__{original_input_node.name}"
# TODO: include package_name?
input_name = f"{unit_test_node.name}__{original_input_node.name}"
input_unique_id = f"model.{package_name}.{input_name}"
input_node = ModelNode(
raw_code=self._build_raw_code(given.get_rows(), original_input_node_columns),
raw_code=self._build_fixture_raw_code(
given.get_rows(), original_input_node_columns
),
resource_type=NodeType.Model,
package_name=package_name,
path=original_input_node.path,
Expand All @@ -136,37 +138,55 @@
checksum=FileHash.empty(),
)
self.unit_test_manifest.nodes[input_node.unique_id] = input_node

# Populate this_input_node_unique_id if input fixture represents node being tested
if original_input_node == tested_node:
unit_test_node.this_input_node_unique_id = input_node.unique_id

# Add unique ids of input_nodes to depends_on
unit_test_node.depends_on.nodes.append(input_node.unique_id)

def _build_raw_code(self, rows, column_name_to_data_types) -> str:
def _build_fixture_raw_code(self, rows, column_name_to_data_types) -> str:
return ("{{{{ get_fixture_sql({rows}, {column_name_to_data_types}) }}}}").format(
rows=rows, column_name_to_data_types=column_name_to_data_types
)

def _get_original_input_node(self, input: str):
"""input: ref('my_model_a')"""
# Exract the ref or sources
statically_parsed = py_extract_from_source(f"{{{{ {input} }}}}")
if statically_parsed["refs"]:
# set refs and sources on the node object
refs: List[RefArgs] = []
for ref in statically_parsed["refs"]:
name = ref.get("name")
package = ref.get("package")
version = ref.get("version")
refs.append(RefArgs(name, package, version))
# TODO: disabled lookup, versioned lookup, public models
original_input_node = self.manifest.ref_lookup.find(
name, package, version, self.manifest
)
elif statically_parsed["sources"]:
input_package_name, input_source_name = statically_parsed["sources"][0]
original_input_node = self.manifest.source_lookup.find(
input_source_name, input_package_name, self.manifest
)
def _get_original_input_node(self, input: str, tested_node: ModelNode):
"""
Returns the original input node as defined in the project given an input reference
and the node being tested.

input: str representing how input node is referenced in tested model sql
* examples:
- "ref('my_model_a')"
- "source('my_source_schema', 'my_source_name')"
- "this"
tested_node: ModelNode of representing node being tested
"""
if input.strip() == "this":
original_input_node = tested_node
else:
raise ParsingError("given input must be ref or source")
try:
statically_parsed = py_extract_from_source(f"{{{{ {input} }}}}")
except ExtractionError:
raise InvalidUnitTestGivenInput(input=input)

Check warning on line 172 in core/dbt/parser/unit_tests.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/unit_tests.py#L171-L172

Added lines #L171 - L172 were not covered by tests

if statically_parsed["refs"]:
for ref in statically_parsed["refs"]:
name = ref.get("name")
package = ref.get("package")
version = ref.get("version")
# TODO: disabled lookup, versioned lookup, public models
original_input_node = self.manifest.ref_lookup.find(
name, package, version, self.manifest
)
elif statically_parsed["sources"]:
input_package_name, input_source_name = statically_parsed["sources"][0]
original_input_node = self.manifest.source_lookup.find(

Check warning on line 185 in core/dbt/parser/unit_tests.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/unit_tests.py#L183-L185

Added lines #L183 - L185 were not covered by tests
input_source_name, input_package_name, self.manifest
)
else:
raise InvalidUnitTestGivenInput(input=input)

Check warning on line 189 in core/dbt/parser/unit_tests.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/unit_tests.py#L189

Added line #L189 was not covered by tests

return original_input_node

Expand Down
74 changes: 74 additions & 0 deletions tests/functional/unit_testing/test_unit_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,77 @@ def test_basic(self, project):
)
with pytest.raises(ParsingError):
results = run_dbt(["unit-test", "--select", "my_model"], expect_pass=False)


event_sql = """
select DATE '2020-01-01' as event_time, 1 as event
union all
select DATE '2020-01-02' as event_time, 2 as event
union all
select DATE '2020-01-03' as event_time, 3 as event
"""

my_incremental_model_sql = """
{{
config(
materialized='incremental'
)
}}

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_time > (select max(event_time) from {{ this }})
{% endif %}
"""

test_my_model_incremental_yml = """
unit:
- model: my_incremental_model
tests:
- name: incremental_false
overrides:
macros:
is_incremental: false
given:
- input: ref('events')
rows:
- {event_time: "2020-01-01", event: 1}
expect:
rows:
- {event_time: "2020-01-01", event: 1}
- name: incremental_true
overrides:
macros:
is_incremental: true
given:
- input: ref('events')
rows:
- {event_time: "2020-01-01", event: 1}
- {event_time: "2020-01-02", event: 2}
- {event_time: "2020-01-03", event: 3}
- input: this
rows:
- {event_time: "2020-01-01", event: 1}
expect:
rows:
- {event_time: "2020-01-02", event: 2}
- {event_time: "2020-01-03", event: 3}
"""


class TestUnitTestIncrementalModel:
@pytest.fixture(scope="class")
def models(self):
return {
"my_incremental_model.sql": my_incremental_model_sql,
"events.sql": event_sql,
"test_my_incremental_model.yml": test_my_model_incremental_yml,
}

def test_basic(self, project):
results = run_dbt(["run"])
assert len(results) == 2

# Select by model name
results = run_dbt(["unit-test", "--select", "my_incremental_model"], expect_pass=True)
assert len(results) == 2