From 964e710ea74dcb77c1a3f808984d1f2fbdf5049a Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 11 Aug 2025 00:52:40 +0100 Subject: [PATCH] Prevent `DetachedInstanceError` when processing executor event Similar to https://github.com/apache/airflow/pull/53838 but prevents it for all queries needing `consumed_asset_events`. Instead of adding `.selectinload(DR.consumed_asset_events))` wherever needed, I am eagerly loading them now. Changes: - Add lazy='selectin' to `DagRun.consumed_asset_events` relationship for always-eager loading - Changed `backref` to `back_populates` in `AssetEvent.created_dagruns` to enable explicit control Why This Fix Works: - Eliminates lazy loading entirely by pre-loading the relationship at the model level - Prevents dependency on consistent session state in concurrent scheduler operations Closes #54306 --- .../src/airflow/jobs/scheduler_job_runner.py | 6 +-- airflow-core/src/airflow/models/asset.py | 2 +- airflow-core/src/airflow/models/dagrun.py | 7 ++++ .../tests/unit/jobs/test_scheduler_job.py | 38 +++++++++---------- 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index f839bbb88a050..dd820e506cfdd 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -39,7 +39,7 @@ from airflow import settings from airflow._shared.timezones import timezone -from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext +from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DRDataModel, TIRunContext from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext, TaskCallbackRequest from airflow.configuration import conf from airflow.dag_processing.bundles.base import BundleUsageTrackingManager @@ -888,7 +888,7 @@ def process_executor_events( ti=ti, msg=msg, context_from_server=TIRunContext( - dag_run=ti.dag_run, + dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True), max_tries=ti.max_tries, variables=[], connections=[], @@ -2266,7 +2266,7 @@ def _purge_task_instances_without_heartbeats( ti=ti, msg=str(task_instance_heartbeat_timeout_message_details), context_from_server=TIRunContext( - dag_run=ti.dag_run, + dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True), max_tries=ti.max_tries, variables=[], connections=[], diff --git a/airflow-core/src/airflow/models/asset.py b/airflow-core/src/airflow/models/asset.py index 8b28f2bea6184..108b824903c11 100644 --- a/airflow-core/src/airflow/models/asset.py +++ b/airflow-core/src/airflow/models/asset.py @@ -743,7 +743,7 @@ class AssetEvent(Base): created_dagruns = relationship( "DagRun", secondary=association_table, - backref="consumed_asset_events", + back_populates="consumed_asset_events", ) source_aliases = relationship( diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 234960a5ef8ff..1b037cb66c667 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -248,6 +248,13 @@ class DagRun(Base, LoggingMixin): back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan", ) + + consumed_asset_events = relationship( + "AssetEvent", + secondary="dagrun_asset_event", + back_populates="created_dagruns", + lazy="selectin", + ) task_instances_histories = relationship( TIH, primaryjoin="and_(DagRun.dag_id == TaskInstanceHistory.dag_id, DagRun.run_id == TaskInstanceHistory.run_id)", diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 27c89a85981d9..c3cfa79fb3e13 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -6876,33 +6876,33 @@ def test_execute_queries_count_with_harvested_dags( # One DAG with one task per DAG file. ([10, 10, 10, 10], 1, 1, "1d", "None", "no_structure"), ([10, 10, 10, 10], 1, 1, "1d", "None", "linear"), - ([24, 14, 14, 14], 1, 1, "1d", "@once", "no_structure"), - ([24, 14, 14, 14], 1, 1, "1d", "@once", "linear"), - ([24, 26, 29, 32], 1, 1, "1d", "30m", "no_structure"), - ([24, 26, 29, 32], 1, 1, "1d", "30m", "linear"), - ([24, 26, 29, 32], 1, 1, "1d", "30m", "binary_tree"), - ([24, 26, 29, 32], 1, 1, "1d", "30m", "star"), - ([24, 26, 29, 32], 1, 1, "1d", "30m", "grid"), + ([40, 14, 14, 14], 1, 1, "1d", "@once", "no_structure"), + ([40, 14, 14, 14], 1, 1, "1d", "@once", "linear"), + ([40, 26, 29, 32], 1, 1, "1d", "30m", "no_structure"), + ([40, 26, 29, 32], 1, 1, "1d", "30m", "linear"), + ([40, 26, 29, 32], 1, 1, "1d", "30m", "binary_tree"), + ([40, 26, 29, 32], 1, 1, "1d", "30m", "star"), + ([40, 26, 29, 32], 1, 1, "1d", "30m", "grid"), # One DAG with five tasks per DAG file. ([10, 10, 10, 10], 1, 5, "1d", "None", "no_structure"), ([10, 10, 10, 10], 1, 5, "1d", "None", "linear"), - ([24, 14, 14, 14], 1, 5, "1d", "@once", "no_structure"), - ([25, 15, 15, 15], 1, 5, "1d", "@once", "linear"), - ([24, 26, 29, 32], 1, 5, "1d", "30m", "no_structure"), - ([25, 28, 32, 36], 1, 5, "1d", "30m", "linear"), - ([25, 28, 32, 36], 1, 5, "1d", "30m", "binary_tree"), - ([25, 28, 32, 36], 1, 5, "1d", "30m", "star"), - ([25, 28, 32, 36], 1, 5, "1d", "30m", "grid"), + ([40, 14, 14, 14], 1, 5, "1d", "@once", "no_structure"), + ([42, 15, 15, 15], 1, 5, "1d", "@once", "linear"), + ([40, 26, 29, 32], 1, 5, "1d", "30m", "no_structure"), + ([42, 28, 32, 36], 1, 5, "1d", "30m", "linear"), + ([42, 28, 32, 36], 1, 5, "1d", "30m", "binary_tree"), + ([42, 28, 32, 36], 1, 5, "1d", "30m", "star"), + ([42, 28, 32, 36], 1, 5, "1d", "30m", "grid"), # 10 DAGs with 10 tasks per DAG file. ([10, 10, 10, 10], 10, 10, "1d", "None", "no_structure"), ([10, 10, 10, 10], 10, 10, "1d", "None", "linear"), - ([218, 69, 69, 69], 10, 10, "1d", "@once", "no_structure"), - ([228, 84, 84, 84], 10, 10, "1d", "@once", "linear"), + ([218, 87, 87, 87], 10, 10, "1d", "@once", "no_structure"), + ([249, 104, 104, 104], 10, 10, "1d", "@once", "linear"), ([217, 119, 119, 119], 10, 10, "1d", "30m", "no_structure"), ([2227, 145, 145, 145], 10, 10, "1d", "30m", "linear"), - ([227, 139, 139, 139], 10, 10, "1d", "30m", "binary_tree"), - ([227, 139, 139, 139], 10, 10, "1d", "30m", "star"), - ([227, 259, 259, 259], 10, 10, "1d", "30m", "grid"), + ([249, 139, 139, 139], 10, 10, "1d", "30m", "binary_tree"), + ([249, 139, 139, 139], 10, 10, "1d", "30m", "star"), + ([249, 259, 259, 259], 10, 10, "1d", "30m", "grid"), ], ) def test_process_dags_queries_count(