Skip to content

Commit

Permalink
Store compiled_sql even when task fails (fixes issue #369) (#671)
Browse files Browse the repository at this point in the history
Update `DbtLocalBaseOperator` code to store `compiled_sql` prior to
exception handling so that when a task fails, the `compiled_sql` can still
be reviewed.

In the process found and fixed a related bug where `compiled_sql` was
being dropped on some operations due to the way that the `full_refresh`
field was being added to the `template_fields`.

Closes #369

Fixes bug introduced in
#623 where
compiled_sql was being lost in `DbtSeedLocalOperator` and
`DbtRunLocalOperator`

Co-authored-by: Andrew Greenburg <agreenburg@vergeventures.net>
  • Loading branch information
agreenburg and Andrew Greenburg authored Nov 14, 2023
1 parent 5d23758 commit ee91ece
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
6 changes: 3 additions & 3 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ def run_command(
logger.info("Outlets: %s", outlets)
self.register_dataset(inlets, outlets)

self.exception_handling(result)
self.store_compiled_sql(tmp_project_dir, context)
self.exception_handling(result)
if self.callback:
self.callback(tmp_project_dir)

Expand Down Expand Up @@ -398,7 +398,7 @@ class DbtSeedLocalOperator(DbtLocalBaseOperator):

ui_color = "#F58D7E"

template_fields: Sequence[str] = DbtBaseOperator.template_fields + ("full_refresh",) # type: ignore[operator]
template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + ("full_refresh",) # type: ignore[operator]

def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None:
self.full_refresh = full_refresh
Expand Down Expand Up @@ -437,7 +437,7 @@ class DbtRunLocalOperator(DbtLocalBaseOperator):

ui_color = "#7352BA"
ui_fgcolor = "#F4F2FC"
template_fields: Sequence[str] = DbtBaseOperator.template_fields + ("full_refresh",) # type: ignore[operator]
template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + ("full_refresh",) # type: ignore[operator]

def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None:
self.full_refresh = full_refresh
Expand Down
4 changes: 2 additions & 2 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo
@pytest.mark.parametrize(
"operator_class,expected_template",
[
(DbtSeedLocalOperator, ("env", "vars", "full_refresh")),
(DbtRunLocalOperator, ("env", "vars", "full_refresh")),
(DbtSeedLocalOperator, ("env", "vars", "compiled_sql", "full_refresh")),
(DbtRunLocalOperator, ("env", "vars", "compiled_sql", "full_refresh")),
],
)
def test_dbt_base_operator_template_fields(operator_class, expected_template):
Expand Down

0 comments on commit ee91ece

Please sign in to comment.