Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Tried to set relationships between tasks in more than one DAG #1218

Closed
1 task
davidmorosini opened this issue Sep 23, 2024 · 8 comments · Fixed by #1360
Closed
1 task

[Bug] Tried to set relationships between tasks in more than one DAG #1218

davidmorosini opened this issue Sep 23, 2024 · 8 comments · Fixed by #1360
Assignees
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc bug Something isn't working do-not-stale Related to stale job and dosubot execution:virtualenv Related to Virtualenv execution environment triage-needed Items need to be reviewed / assigned to milestone
Milestone

Comments

@davidmorosini
Copy link

davidmorosini commented Sep 23, 2024

Astronomer Cosmos Version

Other Astronomer Cosmos version (please specify below)

If "Other Astronomer Cosmos version" selected, which one?

1.6.0

dbt-core version

1.7.16

Versions of dbt adapters

dbt-databricks==1.7.16

LoadMode

DBT_LS_MANIFEST

ExecutionMode

VIRTUALENV

InvocationMode

SUBPROCESS

airflow version

2.8.2

Operating System

Debian GNU/Linux 12 (bookworm) (Local using docker)

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Other

Deployment details

No response

What happened?

Hello everyone,

We are trying to update the Cosmos version to 1.6.0 and have encountered a problem. We would like to request your help in understanding what might be happening.

Before describing the issue, please consider the code snippet below. The object partial_dbt_env is an instance of <class 'airflow.models.xcom_arg.PlainXComArg'>, similar to the example: {{ task_instance.xcom_pull(task_ids='TASK_ID', dag_id='DAG_NAME', key='return_value') }}.

from cosmos import DbtTaskGroup, ProjectConfig

project_config = ProjectConfig(
    dbt_project_path=<DBT PROJECT PATH>,
    manifest_path=<MANIFEST PATH>,
    env_vars=partial_dbt_env
)

transform_task_group = DbtTaskGroup(
    group_id=<GROUP ID>,
    project_config=project_config,
    operator_args=<OPERATOR ARGS>,
    profile_config=<PROFILE CONFIG>,
    execution_config=<EXECUTION CONFIG>,
    render_config=<RENDER CONFIG>
)

When attempting to update from version 1.5.1 to version 1.6.0 using the code above, we encountered a significant issue. The exception raised is:

airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}.

After investigating, we noticed that the way some assignments are made in the cosmos/converter.py code has changed. For example:

  • In version 1.5.1 (see here), the objects env_vars and dbt_vars were assigned by reference.
  • In version 1.6.0 (see here), these same objects are assigned by copy (using deepcopy).

This change means that Cosmos uses both the original objects and the copies created internally. Since these objects carry a reference to the DAG they belong to, the duplication results in two distinct objects, each referencing a DAG, which is identified by the Airflow code when performing a set of all objects with the DAG identification. Previously, with reference passing, the result was the same object. Now, with the copy, the set operation performed by Airflow considers them as distinct objects, leading to the exception.

Note: If the env_vars attribute of the ProjectConfig object is removed, the issue is immediately resolved. However, this leads to other problems, are need the value from partial_dbt_env because it is the result of a task, and thus it is a templated value.
Note: Up to version 1.5.1, the code works without issues; this problem occurs only in version 1.6.0.

We would like to understand the reason for this change in how these variable assignments were made (so that we can adopt the best practices established by you) and whether there is a way to work around this issue in version 1.6.0.

Thank you in advance for your attention and assistance.

Relevant log output

Traceback (most recent call last):
  File "/opt/airflow/dags/core_operators/dbt_run.py", line 109, in dbt_run
    transform_task_group = DbtTaskGroup(
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/airflow/task_group.py", line 28, in __init__
    DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs))
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/converter.py", line 295, in __init__
    build_airflow_graph(
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/airflow/graph.py", line 303, in build_airflow_graph
    task_or_group = conversion_function(  # type: ignore
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/airflow/graph.py", line 251, in generate_task_or_group
    task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
  File "/home/airflow/.local/lib/python3.10/site-packages/cosmos/core/airflow.py", line 33, in get_airflow_task
    airflow_task = Operator(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 445, in apply_defaults
    self.set_xcomargs_dependencies()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1159, in set_xcomargs_dependencies
    XComArg.apply_upstream_relationship(self, arg)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom_arg.py", line 129, in apply_upstream_relationship
    op.set_upstream(operator)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 270, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 228, in _set_relatives
    raise AirflowException(f"Tried to set relationships between tasks in more than one DAG: {dags}")
airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}

How to reproduce

Assemble the provided code by filling in the values according to the runtime we use. The key point to reproducing this error is using the XCom return from a Python operator as the dictionary for partial_dbt_env.

from airflow.decorators import task
from cosmos import DbtTaskGroup, ProjectConfig


@task(task_id="build_partial_dbt_env_vars_operator")
def build_partial_dbt_env() -> Dict[str, str | bool]:
    # some code
    # This return is for demonstration purposes only
    return {
        "ENV_VAR_NAME": "value",
        "ENV_VAR_NAME_2": False
    }
  

partial_dbt_env = build_partial_dbt_env()

project_config = ProjectConfig(
    dbt_project_path=<DBT PROJECT PATH>,
    manifest_path=<MANIFEST PATH>,
    env_vars=partial_dbt_env
)

transform_task_group = DbtTaskGroup(
    group_id=<GROUP ID>,
    project_config=project_config,
    operator_args=<OPERATOR ARGS>,
    profile_config=<PROFILE CONFIG>,
    execution_config=<EXECUTION CONFIG>,
    render_config=<RENDER CONFIG>
)

partial_dbt_env >> transform_task_group

Anything else :)?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@davidmorosini davidmorosini added bug Something isn't working triage-needed Items need to be reviewed / assigned to milestone labels Sep 23, 2024
Copy link

dosubot bot commented Sep 23, 2024

Hey @davidmorosini! 👋 I'm here to help you with your issue.

Updating Astronomer Cosmos from version 1.5.1 to 1.6.0 introduces stricter validation checks to ensure that tasks are not shared between multiple DAGs or TaskGroups. This is likely causing the AirflowException you are encountering. The deepcopy function is used in the cosmos/converter.py file to create independent copies of configuration objects, which can lead to the issue of tasks being associated with more than one DAG [1].

To resolve this issue, ensure that each task is only associated with a single DAG or TaskGroup. Carefully review your DAG and TaskGroup configurations to ensure that tasks are not reused across different DAGs or TaskGroups. Here is a snippet from the build_airflow_graph function that highlights the validation logic:

def build_airflow_graph(
    nodes: dict[str, DbtNode],
    dag: DAG,  # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
    execution_mode: ExecutionMode,  # Cosmos-specific - decide what which class to use
    task_args: dict[str, Any],  # Cosmos/DBT - used to instantiate tasks
    test_indirect_selection: TestIndirectSelection,  # Cosmos/DBT - used to set test indirect selection mode
    dbt_project_name: str,  # DBT / Cosmos - used to name test task if mode is after_all,
    render_config: RenderConfig,
    task_group: TaskGroup | None = None,
    on_warning_callback: Callable[..., Any] | None = None,  # argument specific to the DBT test command
) -> None:
    # ...
    for node_id, node in nodes.items():
        conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
        task_or_group = conversion_function(
            dag=dag,
            task_group=task_group,
            dbt_project_name=dbt_project_name,
            execution_mode=execution_mode,
            task_args=task_args,
            test_behavior=test_behavior,
            source_rendering_behavior=source_rendering_behavior,
            test_indirect_selection=test_indirect_selection,
            on_warning_callback=on_warning_callback,
            node=node,
        )
        if task_or_group is not None:
            tasks_map[node_id] = task_or_group

    create_airflow_task_dependencies(nodes, tasks_map)

Ensure that the configurations for project_config, render_config, and execution_config are set correctly and that tasks are not being reused across multiple DAGs or TaskGroups [2].

To continue talking to Dosu, mention @dosu.

@dosubot dosubot bot added area:config Related to configuration, like YAML files, environment variables, or executer configuration area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc execution:virtualenv Related to Virtualenv execution environment labels Sep 23, 2024
Copy link

This issue is stale because it has been open for 30 days with no activity.

@github-actions github-actions bot added the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label Oct 30, 2024
@dosubot dosubot bot closed this as not planned Won't fix, can't repro, duplicate, stale Nov 6, 2024
@dosubot dosubot bot removed the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label Nov 6, 2024
@davidmorosini
Copy link
Author

Hello @tatiana I hope you're doing well. I noticed that the issue has been closed. Would you have any guidance on this matter? Thank you in advance!

@tatiana tatiana reopened this Nov 13, 2024
@tatiana
Copy link
Collaborator

tatiana commented Nov 13, 2024

@davidmorosini, I'm sorry that this issue was closed. I contacted the DosuBot team and requested that they disable this feature of closing issues.

I'm sorry we didn't reply to you beforehand. I'm adding this issue temptingly to our next sprint so that we can give you the deserved attention.

The behaviour of Cosmos changing values in place in Cosmos 1.5 was unintentional and led to other bugs. This is why it was removed. We'll take time to understand your use case and make sure we can properly support you in the newer versions of Cosmos.

@tatiana tatiana added this to the Cosmos 1.8.0 milestone Nov 13, 2024
@davidmorosini
Copy link
Author

Hello, thank you very much for your response, don't worry about the bot. I asked more to get some guidance on the scenario. We are trying to work around the issue here so we can migrate to the latest versions of Cosmos, however, this feature of retrieving values at runtime is quite important for our case. Thank you again for your reply, and I’m available if needed.

@davidmorosini
Copy link
Author

Thank you very much, @tatiana and @pankajastro, for your support! As soon as the release is available, we will begin the migration testing. Thank you again! =)

@tatiana
Copy link
Collaborator

tatiana commented Dec 20, 2024

@davidmorosini Cosmos 1.8.0 was released!

@davidmorosini
Copy link
Author

Great news, @tatiana! I will schedule the tests here and post in this thread confirming that everything worked out, so that if others encounter the same issue and happen to see this thread, they will have the feedback as well. =)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc bug Something isn't working do-not-stale Related to stale job and dosubot execution:virtualenv Related to Virtualenv execution environment triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants