Skip to content

Commit

Permalink
Fix invalid argument (full_refresh) passed to DbtTestAwsEksOperator…
Browse files Browse the repository at this point in the history
… (and others) (#1175)

#590 added a fix to
consume the kwargs `full_refresh_ignore` if it wasn't consumed by a
higher class as it was preventing the use of test in the DbtTaskGroup if
`full_refresh_ignore` was set. The previous patch fixed this by
consuming the variable for the `DbtLocalBaseOperator`, leaving a bug in
kubernetes and docker operator. Since `AbstractDbtBaseOperator` has been
added as a base of `DbtDockerBaseOperator`, `DbtKubernetesBaseOperator`
and `DbtLocalBaseOperator`, moving the code there will fix all three.

Fixes #1062

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
  • Loading branch information
3 people authored Sep 21, 2024
1 parent 73c088a commit 11de5ba
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
1 change: 1 addition & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def __init__(
self.dbt_cmd_global_flags = dbt_cmd_global_flags or []
self.cache_dir = cache_dir
self.extra_context = extra_context or {}
kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes
super().__init__(**kwargs)

def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]:
Expand Down
1 change: 0 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ def __init__(
self._dbt_runner: dbtRunner | None = None
if self.invocation_mode:
self._set_invocation_methods()
kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes
super().__init__(**kwargs)

# For local execution mode, we're consistent with the LoadMode.DBT_LS command in forwarding the environment
Expand Down
65 changes: 65 additions & 0 deletions tests/operators/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,68 @@ def test_created_pod():
]
assert container.args == expected_container_args
assert container.command == []


@pytest.mark.parametrize(
"operator_class,kwargs,expected_cmd",
[
(
DbtSeedKubernetesOperator,
{"full_refresh": True},
["dbt", "seed", "--full-refresh", "--project-dir", "my/dir"],
),
(
DbtBuildKubernetesOperator,
{"full_refresh": True},
["dbt", "build", "--full-refresh", "--project-dir", "my/dir"],
),
(
DbtRunKubernetesOperator,
{"full_refresh": True},
["dbt", "run", "--full-refresh", "--project-dir", "my/dir"],
),
(
DbtTestKubernetesOperator,
{},
["dbt", "test", "--project-dir", "my/dir"],
),
(
DbtTestKubernetesOperator,
{"select": []},
["dbt", "test", "--project-dir", "my/dir"],
),
(
DbtTestKubernetesOperator,
{"full_refresh": True, "select": ["tag:daily"], "exclude": ["tag:disabled"]},
["dbt", "test", "--select", "tag:daily", "--exclude", "tag:disabled", "--project-dir", "my/dir"],
),
(
DbtTestKubernetesOperator,
{"full_refresh": True, "selector": "nightly_snowplow"},
["dbt", "test", "--selector", "nightly_snowplow", "--project-dir", "my/dir"],
),
],
)
def test_operator_execute_with_flags(operator_class, kwargs, expected_cmd):
task = operator_class(
task_id="my-task",
project_dir="my/dir",
**kwargs,
)

with patch(
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.hook",
is_in_cluster=False,
), patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup"), patch(
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.get_or_create_pod",
side_effect=ValueError("Mock"),
) as get_or_create_pod:
try:
task.execute(context={})
except ValueError as e:
if e != get_or_create_pod.side_effect:
raise

pod_args = get_or_create_pod.call_args.kwargs["pod_request_obj"].to_dict()["spec"]["containers"][0]["args"]

assert expected_cmd == pod_args

0 comments on commit 11de5ba

Please sign in to comment.