Skip to content

Scheduler bulk-fails all scheduled tasks when serialized DAG is transiently missing #62050

@vincent-heng

Description

@vincent-heng

Apache Airflow version

Other Airflow 3 version (please specify below)

If "Other Airflow 3 version" selected, which one?

3.0.6, still on main

What happened?

My instance has been hitting intermittent task failures on MWAA (Airflow 3.0.6, ~150 DAGs, 4 schedulers). Tasks failed in bulk with no obvious cause but succeeded on manual retry. I noticed this on scheduler_job_runner.py:

When the scheduler can't find a DAG in the serialized_dag table, it does this:

session.execute(
    update(TI)
    .where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
    .values(state=TaskInstanceState.FAILED)
    .execution_options(synchronize_session="fetch")
)

It sets every SCHEDULED task instance for that DAG to FAILED.

With PR #58259 and #56422, it probably happens less often but the bulk-failure issue has never been addressed

What you think should happen instead?

The scheduler could skip scheduling that DAG for the current iteration and try again next time, instead of immediately failing everything.

I thought of logging a warning instead of error, tracking a counter per DAG, and only failing tasks after several consecutive misses to distinguish transient gaps from genuinely missing DAGs. What do you think about this solution?

PR #55126 tried something similar for stale DAGs (skipping and continuing)

How to reproduce

def test_missing_serialized_dag_bulk_fails(self, dag_maker, session):
    dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"

    with dag_maker(dag_id=dag_id, session=session):
        EmptyOperator(task_id="task_a")
        EmptyOperator(task_id="task_b")

    scheduler_job = Job()
    self.job_runner = SchedulerJobRunner(job=scheduler_job)

    # Simulate serialized DAG being transiently missing
    self.job_runner.scheduler_dag_bag = mock.MagicMock()
    self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None

    dr = dag_maker.create_dagrun(state=DagRunState.RUNNING, run_type=DagRunType.SCHEDULED)
    for ti in dr.task_instances:
        ti.state = State.SCHEDULED
        session.merge(ti)
    session.flush()

    res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
    session.flush()

    assert len(res) == 0
    tis = dr.get_task_instances(session=session)  # Both tasks are FAILED instead of SCHEDULED
    for ti in tis:
        print(f"{ti.task_id}: {ti.state}")

Operating System

AWS MWAA

Versions of Apache Airflow Providers

n/a

Deployment

Amazon (AWS) MWAA

Deployment details

  • MWAA environment running 3.0.6 (latest available on MWAA as of Feb 2026)
  • mw1.2xlarge instance class
  • 4 schedulers
  • 5-20 workers (auto-scaling)
  • ~150 DAGs
  • min_file_process_interval=300

Anything else?

would appreciate guidance on the preferred approach (retry counter or just ignoring)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions