Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Dec 19, 2024
1 parent 6a54e31 commit 8ba587d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
12 changes: 6 additions & 6 deletions cosmos/dbt/parser/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int:
def extract_freshness_warn_msg(result: FullOutputSubprocessResult) -> Tuple[List[str], List[str]]:
log_list = result.full_output

test_names = []
test_results = []
node_names = []
node_results = []

for line in log_list:

if DBT_FRESHNESS_WARN_MSG in line:
test_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1]
test_names.append(test_name)
test_results.append(line)
node_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1]
node_names.append(node_name)
node_results.append(line)

return test_names, test_results
return node_names, node_results


def extract_log_issues(log_list: List[str]) -> Tuple[List[str], List[str]]:
Expand Down
1 change: 1 addition & 0 deletions dev/dags/example_source_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
catchup=False,
dag_id="source_rendering_dag",
default_args={"retries": 2},
on_warning_callback=lambda context: print(context),
)
22 changes: 22 additions & 0 deletions tests/dbt/parser/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

from cosmos.dbt.parser.output import (
extract_dbt_runner_issues,
extract_freshness_warn_msg,
extract_log_issues,
parse_number_of_warnings_dbt_runner,
parse_number_of_warnings_subprocess,
)
from cosmos.hooks.subprocess import FullOutputSubprocessResult


@pytest.mark.parametrize(
Expand Down Expand Up @@ -112,3 +114,23 @@ def test_extract_dbt_runner_issues_with_status_levels():

assert node_names == ["node1", "node2"]
assert node_results == ["An error message", "A failure message"]


def test_extract_freshness_warn_msg():
result = FullOutputSubprocessResult(
full_output=[
"Info: some other log message",
"INFO - 11:50:42 1 of 1 WARN freshness of postgres_db.raw_orders ................................ [WARN in 0.01s]",
"INFO - 11:50:42",
"INFO - 11:50:42 Finished running 1 source in 0 hours 0 minutes and 0.04 seconds (0.04s).",
"INFO - 11:50:42 Done.",
],
output="INFO - 11:50:42 Done.",
exit_code=0,
)
node_names, node_results = extract_freshness_warn_msg(result)

assert node_names == ["postgres_db.raw_orders"]
assert node_results == [
"INFO - 11:50:42 1 of 1 WARN freshness of postgres_db.raw_orders ................................ [WARN in 0.01s]"
]
32 changes: 32 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,38 @@ def test_store_freshness_not_store_compiled_sql(mock_context, mock_session):
assert instance.freshness == ""


@pytest.mark.parametrize(
"invocation_mode, expected_extract_function",
[
(InvocationMode.SUBPROCESS, "extract_freshness_warn_msg"),
(InvocationMode.DBT_RUNNER, "extract_dbt_runner_issues"),
],
)
def test_handle_warnings(invocation_mode, expected_extract_function, mock_context):
result = MagicMock()

instance = DbtSourceLocalOperator(
task_id="test",
profile_config=None,
project_dir="my/dir",
on_warning_callback=lambda context: print(context),
invocation_mode=invocation_mode,
)

with patch(f"cosmos.operators.local.{expected_extract_function}") as mock_extract_issues, patch.object(
instance, "on_warning_callback"
) as mock_on_warning_callback:
mock_extract_issues.return_value = (["test_name1", "test_name2"], ["test_name1", "test_name2"])

instance._handle_warnings(result, mock_context)

mock_extract_issues.assert_called_once_with(result)

mock_on_warning_callback.assert_called_once_with(
{**mock_context, "test_names": ["test_name1", "test_name2"], "test_results": ["test_name1", "test_name2"]}
)


def test_dbt_compile_local_operator_initialisation():
operator = DbtCompileLocalOperator(
task_id="fake-task",
Expand Down

0 comments on commit 8ba587d

Please sign in to comment.