Skip to content

Conversation

@ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Apr 8, 2025

Ensure DAG runs in a versioned bundle use the correct serialized DAG (serdag) version.

Previously, when a DAG had multiple versions due to changes (e.g., a task removed), running DAG runs would sometimes incorrectly use the latest serialized DAG, even if they were started with an older version. This caused tasks that should still run to be incorrectly marked as removed.

The issue was caused by the scheduler not considering bundle_version when retrieving the DAG from the dagbag, defaulting to the latest available version.

This fix updates dagbag.dags to store DAGs keyed by both dag_id and bundle_version. All DAG retrievals in scheduling are now made with the correct version context, ensuring task consistency for running DAG runs.

Also introduces a GetDag Protocol for precise typing of the cached DAG getter, now supporting both dag_id and bundle_version parameters.

How to reproduce:
Using the dag below in a git bundle, create a run, then comment out the first section, uncomment the second quickly while the first is still in the sleep task. Run the task and you will see the astronomer task marked removed.

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator

with DAG(dag_id="demo"):
    # First run
    sleep = BashOperator(task_id="sleep", bash_command="sleep 300")
    hello = BashOperator(task_id="hello", bash_command="echo 'Hello'")
    astronomer = BashOperator(task_id="astronomer", bash_command="echo 'Astonomer'")

    sleep >> hello >> astronomer

    # Second run
    #sleep = BashOperator(task_id="sleep", bash_command="sleep 45")
    #hello = BashOperator(task_id="hello", bash_command="echo 'Hello Astronomer!!'")

    #sleep >> hello

Closes: #49007

@ephraimbuddy
Copy link
Contributor Author

Works correctly now:

versioning-2.mov

Ensure DAG runs in a versioned bundle use the correct serialized DAG (serdag) version.

Previously, when a DAG had multiple versions due to changes (e.g., a task removed),
running DAG runs would sometimes incorrectly use the latest serialized DAG, even if
they were started with an older version. This caused tasks that should still run
to be incorrectly marked as removed.

The issue was caused by the scheduler not considering `bundle_version` when retrieving
the DAG from the `dagbag`, defaulting to the latest available version.

This fix updates `dagbag.dags` to store DAGs keyed by both `dag_id` and `bundle_version`.
All DAG retrievals in scheduling are now made with the correct
version context, ensuring task consistency for running DAG runs.

Also introduces a `GetDag` Protocol for precise typing of the cached DAG getter,
now supporting both `dag_id` and `bundle_version` parameters.
@ephraimbuddy ephraimbuddy force-pushed the update-dagbag-to-tuple-keys branch from 0a4227a to c412057 Compare April 9, 2025 13:43
@kaxil kaxil requested a review from dstandish April 9, 2025 18:18
id=id,
path=dag_file.absolute_path,
bundle_path=cast("Path", dag_file.bundle_path),
bundle_version=dag_file.bundle_version,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of this? I thought that with dag file processing, we are always tracking the latest version and we don't specify it here intentionally -- and i'm not even sure where this argument goes to in the interface

Copy link
Contributor Author

@ephraimbuddy ephraimbuddy Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed so that we specifically store dag_id and the version in dagbag.dags.

If you have two dagruns running, with different number of task instances, you will notice that when the Scheduler does dag_run.update_state, some task will be marked removed because the retrieved dag from dagbag has fewer tasks

@dstandish
Copy link
Contributor

This is a rather large PR @ephraimbuddy that, by the looks of it, has implications beyond the specific case that you're solving, i.e. running the right dag version. Can you possibly split this up so that we can just see the fix that is specific to this problem?

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Apr 9, 2025

This is a rather large PR @ephraimbuddy that, by the looks of it, has implications beyond the specific case that you're solving, i.e. running the right dag version. Can you possibly split this up so that we can just see the fix that is specific to this problem?

I don't think the PR can be split up. The major fix here is having DagBag.dags keys to be (dag_id, bundle_version) instead of dag_id. DagBag is used in all the places I updated. We can't use the tuple key without updating all the other places. I'm going to reply to the other comments but this is the fix. You can also try the reproduction and see if there's another way to fix this issue

@dstandish
Copy link
Contributor

I don't think the PR can be split up. The major fix here is having DagBag.dags keys to be (dag_id, bundle_version) instead of dag_id. DagBag is used in all the places I updated. We can't use the tuple key without updating all the other places. I'm going to reply to the other comments but this is the fix. You can also try the reproduction and see if there's another way to fix this issue

I see, you changed the type in DagBag.dags, that's why the change above looked wrong

@dstandish
Copy link
Contributor

closing in favor of #49097

@dstandish dstandish closed this Apr 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Versioned dag run uses the latest version of serialized dag in the Scheduler instead of sticking to their version

3 participants