From 782dba1c84d3c720f130d332c02fb75a6d38c25d Mon Sep 17 00:00:00 2001 From: David Spulak Date: Sat, 2 Sep 2023 22:18:59 +0200 Subject: [PATCH 1/2] Add profile_config arg to DbtKubernetesBaseOperator --- cosmos/operators/kubernetes.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 75ea001c4..0d71252fa 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -7,6 +7,7 @@ from airflow.utils.context import Context from cosmos.log import get_logger +from cosmos.config import ProfileConfig from cosmos.operators.base import DbtBaseOperator @@ -35,7 +36,8 @@ class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type intercept_flag = False - def __init__(self, **kwargs: Any) -> None: + def __init__(self, profile_config: ProfileConfig, **kwargs: Any) -> None: + self.profile_config = profile_config super().__init__(**kwargs) def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None: @@ -57,6 +59,16 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) # to add that in the future self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) + + # Parse ProfileConfig and add additional arguments to the dbt_cmd + if self.profile_config.profiles_yml_filepath: + with self.profile_config.ensure_profile() as (profile_path, _): + dbt_cmd.extend(["--profiles-dir", str(profile_path.parent)]) + if self.profile_config.profile_name: + dbt_cmd.extend(["--profile", self.profile_config.profile_name]) + if self.profile_config.target_name: + dbt_cmd.extend(["--target", self.profile_config.target_name]) + # set env vars self.build_env_args(env_vars) self.arguments = dbt_cmd From 1ca3fd09375f936263d4f4283e8e82c6745e56cd Mon Sep 17 00:00:00 2001 From: David Spulak Date: Sat, 2 Sep 2023 23:19:46 +0200 Subject: [PATCH 2/2] Remove --profile_dir & updates to pass pre_commit --- cosmos/operators/kubernetes.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 0d71252fa..8b38d4f16 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -36,7 +36,7 @@ class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type intercept_flag = False - def __init__(self, profile_config: ProfileConfig, **kwargs: Any) -> None: + def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None: self.profile_config = profile_config super().__init__(**kwargs) @@ -59,16 +59,14 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) # to add that in the future self.dbt_executable_path = "dbt" dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) - + # Parse ProfileConfig and add additional arguments to the dbt_cmd - if self.profile_config.profiles_yml_filepath: - with self.profile_config.ensure_profile() as (profile_path, _): - dbt_cmd.extend(["--profiles-dir", str(profile_path.parent)]) - if self.profile_config.profile_name: - dbt_cmd.extend(["--profile", self.profile_config.profile_name]) - if self.profile_config.target_name: - dbt_cmd.extend(["--target", self.profile_config.target_name]) - + if self.profile_config: + if self.profile_config.profile_name: + dbt_cmd.extend(["--profile", self.profile_config.profile_name]) + if self.profile_config.target_name: + dbt_cmd.extend(["--target", self.profile_config.target_name]) + # set env vars self.build_env_args(env_vars) self.arguments = dbt_cmd @@ -81,7 +79,7 @@ class DbtLSKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#DBCDF6" - def __init__(self, **kwargs: str) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["ls"] @@ -98,7 +96,7 @@ class DbtSeedKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#F58D7E" - def __init__(self, full_refresh: bool = False, **kwargs: str) -> None: + def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None: self.full_refresh = full_refresh super().__init__(**kwargs) self.base_cmd = ["seed"] @@ -123,7 +121,7 @@ class DbtSnapshotKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#964B00" - def __init__(self, **kwargs: str) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["snapshot"] @@ -139,7 +137,7 @@ class DbtRunKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#7352BA" ui_fgcolor = "#F4F2FC" - def __init__(self, **kwargs: str) -> None: + def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["run"] @@ -154,7 +152,7 @@ class DbtTestKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#8194E0" - def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None: + def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["test"] # as of now, on_warning_callback in kubernetes executor does nothing @@ -176,7 +174,7 @@ class DbtRunOperationKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#8194E0" template_fields: Sequence[str] = ("args",) - def __init__(self, macro_name: str, args: dict[str, Any] | None = None, **kwargs: str) -> None: + def __init__(self, macro_name: str, args: dict[str, Any] | None = None, **kwargs: Any) -> None: self.macro_name = macro_name self.args = args super().__init__(**kwargs)