diff --git a/airflow-core/src/airflow/models/asset.py b/airflow-core/src/airflow/models/asset.py index 8b28f2bea6184..650cb8589b424 100644 --- a/airflow-core/src/airflow/models/asset.py +++ b/airflow-core/src/airflow/models/asset.py @@ -108,12 +108,17 @@ def remove_references_to_deleted_dags(session: Session): DagScheduleAssetAliasReference, TaskOutletAssetReference, ] - for model in models_to_check: - session.execute( - delete(model) - .where(model.dag_id.in_(select(DagModel.dag_id).where(DagModel.is_stale))) - .execution_options(synchronize_session="fetch") - ) + + # The queries need to be done in separate steps, because in the case of multiple + # dag processors on MySQL, there could be a deadlock caused by acquiring both an + # exclusive lock for deletion and shared lock for query in reverse sequence + if stale_dag_ids := session.scalars(select(DagModel.dag_id).where(DagModel.is_stale)).all(): + for model in models_to_check: + session.execute( + delete(model) + .where(model.dag_id.in_(stale_dag_ids)) + .execution_options(synchronize_session="fetch") + ) alias_association_table = Table(