From c21f8c999004f2f850af8015ed837451f1d7eeb8 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:57:47 +0530 Subject: [PATCH] Fix dag randering for taskflow + DbtTaskGroup combo (#1360) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description I believe the deepcopy results in two separate DAG objects—the original object and its deepcopy and the task is referencing to both objects causing failure ``` airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {, } ``` Screenshot 2024-12-03 at 2 30 23 AM DAG Code ```python from datetime import datetime from airflow.decorators import task, dag from cosmos import DbtTaskGroup, ProjectConfig from include.constants import jaffle_shop_path, venv_execution_config, manifest_path from include.profiles import airflow_db @task(task_id="build_partial_dbt_env_vars_operator") def build_partial_dbt_env(): # some code # This return is for demonstration purposes only return { "ENV_VAR_NAME": "value", "ENV_VAR_NAME_2": False } # partial_dbt_env = build_partial_dbt_env() @dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, tags=["simple"], ) def simple_task_group1() -> None: DbtTaskGroup( group_id="transform_task_group", project_config=ProjectConfig( dbt_project_path=jaffle_shop_path, manifest_path=manifest_path, env_vars=build_partial_dbt_env() ), profile_config=airflow_db, execution_config=venv_execution_config, ) simple_task_group1() #partial_dbt_env >> transform_task_group ``` ## Related Issue(s) closes: https://github.com/astronomer/astronomer-cosmos/issues/1218 ## Breaking Change? ## Checklist - [ ] I have made corresponding changes to the documentation (if required) - [ ] I have added tests that prove my fix is effective or that my feature works --- cosmos/converter.py | 4 +-- dev/dags/example_taskflow.py | 48 ++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 dev/dags/example_taskflow.py diff --git a/cosmos/converter.py b/cosmos/converter.py index 5bf99cac8..524864aa9 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -229,8 +229,8 @@ def __init__( validate_changed_config_paths(execution_config, project_config, render_config) - env_vars = copy.deepcopy(project_config.env_vars or operator_args.get("env")) - dbt_vars = copy.deepcopy(project_config.dbt_vars or operator_args.get("vars")) + env_vars = project_config.env_vars or operator_args.get("env") + dbt_vars = project_config.dbt_vars or operator_args.get("vars") if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: logger.warning( diff --git a/dev/dags/example_taskflow.py b/dev/dags/example_taskflow.py new file mode 100644 index 000000000..e75be2adc --- /dev/null +++ b/dev/dags/example_taskflow.py @@ -0,0 +1,48 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow.decorators import dag, task + +from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +@task(task_id="build_partial_dbt_env_vars_operator") +def build_partial_dbt_env(): + return {"ENV_VAR_NAME": "value", "ENV_VAR_NAME_2": False} + + +@dag( + schedule_interval="@daily", + start_date=datetime(2024, 1, 1), + catchup=False, +) +def example_taskflow() -> None: + DbtTaskGroup( + group_id="transform_task_group", + project_config=ProjectConfig( + dbt_project_path=DBT_ROOT_PATH / "jaffle_shop", + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + env_vars=build_partial_dbt_env(), + ), + profile_config=profile_config, + operator_args={"install_deps": True}, + ) + + +example_taskflow()