Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add profile_config arg to DbtKubernetesBaseOperator #505

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.utils.context import Context

from cosmos.log import get_logger
from cosmos.config import ProfileConfig
from cosmos.operators.base import DbtBaseOperator


Expand Down Expand Up @@ -35,7 +36,8 @@ class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type

intercept_flag = False

def __init__(self, **kwargs: Any) -> None:
def __init__(self, profile_config: ProfileConfig | None = None, **kwargs: Any) -> None:
self.profile_config = profile_config
super().__init__(**kwargs)

def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None:
Expand All @@ -57,6 +59,14 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None)
# to add that in the future
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)

# Parse ProfileConfig and add additional arguments to the dbt_cmd
if self.profile_config:
if self.profile_config.profile_name:
dbt_cmd.extend(["--profile", self.profile_config.profile_name])
if self.profile_config.target_name:
dbt_cmd.extend(["--target", self.profile_config.target_name])

# set env vars
self.build_env_args(env_vars)
self.arguments = dbt_cmd
Expand All @@ -69,7 +79,7 @@ class DbtLSKubernetesOperator(DbtKubernetesBaseOperator):

ui_color = "#DBCDF6"

def __init__(self, **kwargs: str) -> None:
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["ls"]

Expand All @@ -86,7 +96,7 @@ class DbtSeedKubernetesOperator(DbtKubernetesBaseOperator):

ui_color = "#F58D7E"

def __init__(self, full_refresh: bool = False, **kwargs: str) -> None:
def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None:
self.full_refresh = full_refresh
super().__init__(**kwargs)
self.base_cmd = ["seed"]
Expand All @@ -111,7 +121,7 @@ class DbtSnapshotKubernetesOperator(DbtKubernetesBaseOperator):

ui_color = "#964B00"

def __init__(self, **kwargs: str) -> None:
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["snapshot"]

Expand All @@ -127,7 +137,7 @@ class DbtRunKubernetesOperator(DbtKubernetesBaseOperator):
ui_color = "#7352BA"
ui_fgcolor = "#F4F2FC"

def __init__(self, **kwargs: str) -> None:
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["run"]

Expand All @@ -142,7 +152,7 @@ class DbtTestKubernetesOperator(DbtKubernetesBaseOperator):

ui_color = "#8194E0"

def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None:
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["test"]
# as of now, on_warning_callback in kubernetes executor does nothing
Expand All @@ -164,7 +174,7 @@ class DbtRunOperationKubernetesOperator(DbtKubernetesBaseOperator):
ui_color = "#8194E0"
template_fields: Sequence[str] = ("args",)

def __init__(self, macro_name: str, args: dict[str, Any] | None = None, **kwargs: str) -> None:
def __init__(self, macro_name: str, args: dict[str, Any] | None = None, **kwargs: Any) -> None:
self.macro_name = macro_name
self.args = args
super().__init__(**kwargs)
Expand Down