diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py
index b1dcb8747..caf992b78 100644
--- a/cosmos/airflow/graph.py
+++ b/cosmos/airflow/graph.py
@@ -172,7 +172,6 @@ def build_airflow_graph(
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)
-
task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)
diff --git a/cosmos/converter.py b/cosmos/converter.py
index 05b523a1d..c97d9274e 100644
--- a/cosmos/converter.py
+++ b/cosmos/converter.py
@@ -154,7 +154,6 @@ def __init__(
"profile_config": profile_config,
"emit_datasets": emit_datasets,
}
-
if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path
diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py
index b61dbd5fb..d43a2d241 100644
--- a/cosmos/operators/base.py
+++ b/cosmos/operators/base.py
@@ -33,6 +33,7 @@ class DbtBaseOperator(BaseOperator):
:param cache_selected_only:
:param no_version_check: dbt optional argument - If set, skip ensuring dbt's version matches the one specified in
the dbt_project.yml file ('require-dbt-version')
+ :param emit_datasets: Enable emitting inlets and outlets during task execution
:param fail_fast: dbt optional argument to make dbt exit immediately if a single resource fails to build.
:param quiet: dbt optional argument to show only error logs in stdout
:param warn_error: dbt optional argument to convert dbt warnings into errors
@@ -87,6 +88,7 @@ def __init__(
selector: str | None = None,
vars: dict[str, str] | None = None,
models: str | None = None,
+ emit_datasets: bool = True,
cache_selected_only: bool = False,
no_version_check: bool = False,
fail_fast: bool = False,
@@ -112,6 +114,7 @@ def __init__(
self.selector = selector
self.vars = vars
self.models = models
+ self.emit_datasets = emit_datasets
self.cache_selected_only = cache_selected_only
self.no_version_check = no_version_check
self.fail_fast = fail_fast
diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py
index 38ca47452..fd4a6fe7b 100644
--- a/cosmos/operators/kubernetes.py
+++ b/cosmos/operators/kubernetes.py
@@ -70,6 +70,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None)
if self.profile_config.target_name:
dbt_cmd.extend(["--target", self.profile_config.target_name])
+ if self.project_dir:
+ dbt_cmd.extend(["--project-dir", str(self.project_dir)])
+
# set env vars
self.build_env_args(env_vars)
self.arguments = dbt_cmd
diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py
index 4888583bb..aaad4e259 100644
--- a/cosmos/operators/local.py
+++ b/cosmos/operators/local.py
@@ -82,6 +82,7 @@ class DbtLocalBaseOperator(DbtBaseOperator):
:param profile_name: A name to use for the dbt profile. If not provided, and no profile target is found
in your project's dbt_project.yml, "cosmos_profile" is used.
:param install_deps: If true, install dependencies before running the command
+ :param install_deps: If true, the operator will set inlets and outlets
:param callback: A callback function called on after a dbt run with a path to the dbt project directory.
:param target_name: A name to use for the dbt target. If not provided, and no target is found
in your project's dbt_project.yml, "cosmos_target" is used.
@@ -99,7 +100,6 @@ def __init__(
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
should_store_compiled_sql: bool = True,
- emit_datasets: bool = True,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
@@ -107,7 +107,6 @@ def __init__(
self.callback = callback
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
- self.emit_datasets = emit_datasets
self.openlineage_events_completes: list[RunEvent] = []
super().__init__(**kwargs)
diff --git a/docs/_static/jaffle_shop_k8s_dag_run.png b/docs/_static/jaffle_shop_k8s_dag_run.png
index f2ee1a5e2..e41db8b81 100644
Binary files a/docs/_static/jaffle_shop_k8s_dag_run.png and b/docs/_static/jaffle_shop_k8s_dag_run.png differ
diff --git a/docs/getting_started/kubernetes.rst b/docs/getting_started/kubernetes.rst
index 9a500a047..6d2368997 100644
--- a/docs/getting_started/kubernetes.rst
+++ b/docs/getting_started/kubernetes.rst
@@ -30,20 +30,27 @@ For instance,
.. code-block:: text
- DbtTaskGroup(
- ...
+ run_models = DbtTaskGroup(
+ profile_config=ProfileConfig(
+ profile_name="postgres_profile",
+ target_name="dev",
+ profile_mapping=PostgresUserPasswordProfileMapping(
+ conn_id="postgres_default",
+ profile_args={
+ "schema": "public",
+ },
+ ),
+ ),
+ project_config=ProjectConfig(PROJECT_DIR),
+ execution_config=ExecutionConfig(
+ execution_mode=ExecutionMode.KUBERNETES,
+ ),
operator_args={
- "queue": "kubernetes",
- "image": "dbt-jaffle-shop:1.0.0",
- "image_pull_policy": "Always",
+ "image": DBT_IMAGE,
"get_logs": True,
"is_delete_operator_pod": False,
- "namespace": "default",
- "env_vars": {
- ...
- },
+ "secrets": [postgres_password_secret, postgres_host_secret],
},
- execution_mode="kubernetes",
)
Step-by-step instructions
@@ -53,7 +60,7 @@ Using installed `Kind `_, you can setup a local kuber
.. code-block:: bash
- kind cluster create
+ kind create cluster
Deploy a Postgres pod to Kind using `Helm `_
diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py
index 67d3ff137..839a6db54 100644
--- a/tests/operators/test_kubernetes.py
+++ b/tests/operators/test_kubernetes.py
@@ -98,6 +98,8 @@ def test_dbt_kubernetes_build_command():
"end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
+ "--project-dir",
+ "my/dir",
]
@@ -150,6 +152,8 @@ def test_created_pod(test_hook):
"data_interval_start.strftime(''%Y%m%d%H%M%S'') "
"}}'\n",
"--no-version-check",
+ "--project-dir",
+ "my/dir",
],
"command": [],
"env": [{"name": "FOO", "value": "BAR", "value_from": None}],
diff --git a/tests/sample/profiles.yml b/tests/sample/profiles.yml
new file mode 100644
index 000000000..359c1e6eb
--- /dev/null
+++ b/tests/sample/profiles.yml
@@ -0,0 +1,12 @@
+default:
+ target: dev
+ outputs:
+ dev:
+ type: postgres
+ host: "localhost"
+ user: "postgres"
+ password: "postgres"
+ port: 5432
+ dbname: "postgres"
+ schema: "public"
+ threads: 4
diff --git a/tests/test_converter.py b/tests/test_converter.py
index 99d8c32ac..0d321730a 100644
--- a/tests/test_converter.py
+++ b/tests/test_converter.py
@@ -1,8 +1,17 @@
+from pathlib import Path
+
+from unittest.mock import patch
import pytest
+
+from cosmos.converter import DbtToAirflowConverter, validate_arguments
+from cosmos.constants import DbtResourceType, ExecutionMode
+from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
+from cosmos.dbt.graph import DbtNode
from cosmos.exceptions import CosmosValueError
-from cosmos.converter import validate_arguments
+SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml"
+SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"
@pytest.mark.parametrize("argument_key", ["tags", "paths"])
@@ -16,3 +25,45 @@ def test_validate_arguments_tags(argument_key):
validate_arguments(select, exclude, profile_args, task_args)
expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}"
assert err.value.args[0] == expected
+
+
+parent_seed = DbtNode(
+ name="seed_parent",
+ unique_id="seed_parent",
+ resource_type=DbtResourceType.SEED,
+ depends_on=[],
+ file_path="",
+)
+nodes = {"seed_parent": parent_seed}
+
+
+@pytest.mark.parametrize(
+ "execution_mode,operator_args",
+ [
+ (ExecutionMode.KUBERNETES, {}),
+ # (ExecutionMode.DOCKER, {"image": "sample-image"}),
+ ],
+)
+@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
+@patch("cosmos.converter.DbtGraph.load")
+def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args):
+ """
+ This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors.
+ """
+ project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
+ execution_config = ExecutionConfig(execution_mode=execution_mode)
+ render_config = RenderConfig(emit_datasets=True)
+ profile_config = ProfileConfig(
+ profile_name="my_profile_name",
+ target_name="my_target_name",
+ profiles_yml_filepath=SAMPLE_PROFILE_YML,
+ )
+ converter = DbtToAirflowConverter(
+ nodes=nodes,
+ project_config=project_config,
+ profile_config=profile_config,
+ execution_config=execution_config,
+ render_config=render_config,
+ operator_args=operator_args,
+ )
+ assert converter