From 0cae91950879b93099275e6beffbfab12af1d59f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 23 Dec 2025 09:50:17 +0800 Subject: [PATCH 1/2] [v3-1-test] fix(scheduler): Eager-load DagRun asset relationships before creating DagRunContext (#59714) (cherry picked from commit 94517f8a62ba86ce0433a1a8d40f62aadb90c2b0) Co-authored-by: Wei Lee --- .../src/airflow/jobs/scheduler_job_runner.py | 8 ++ .../tests/unit/jobs/test_scheduler_job.py | 78 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 10c511e8bb4bf..227327aec879a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1979,6 +1979,14 @@ def _schedule_dag_run( ): dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run)) + dag_run = session.scalar( + select(DagRun) + .where(DagRun.id == dag_run.id) + .options( + selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset), + selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases), + ) + ) callback_to_execute = DagCallbackRequest( filepath=dag_model.relative_fileloc, dag_id=dag.dag_id, diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 7f815ccea7353..9ea5b7fde0551 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -25,6 +25,7 @@ from collections.abc import Generator from datetime import timedelta from pathlib import Path +from typing import TYPE_CHECKING from unittest import mock from unittest.mock import MagicMock, PropertyMock, patch from uuid import uuid4 @@ -104,6 +105,10 @@ from unit.listeners.test_listeners import get_listener_manager from unit.models import TEST_DAGS_FOLDER +if TYPE_CHECKING: + from sqlalchemy.orm.session import Session + + from tests_common.pytest_plugin import DagMaker pytestmark = pytest.mark.db_test PERF_DAGS_FOLDER = AIRFLOW_ROOT_PATH / "dev" / "airflow_perf" / "dags" @@ -669,6 +674,79 @@ def test_process_executor_events_with_asset_events(self, mock_stats_incr, sessio assert len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1 assert callback_request.context_from_server.dag_run.consumed_asset_events[0].asset.uri == asset1.uri + @pytest.mark.usefixtures("testing_dag_bundle") + def test_schedule_dag_run_with_asset_event(self, session: Session, dag_maker: DagMaker): + """ + Verify that scheduler can build DagRunContext for a timed-out Dag run + with consumed asset events without raising DetachedInstanceError. + """ + asset1 = Asset(uri="test://asset1", name="test_asset_executor", group="test_group") + asset_model = AssetModel(name=asset1.name, uri=asset1.uri, group=asset1.group) + session.add(asset_model) + session.flush() + + with dag_maker( + dag_id="test_executor_events_with_assets", + schedule=[asset1], + fileloc="/test_path1/", + dagrun_timeout=timedelta(minutes=1), + ): + EmptyOperator(task_id="dummy_task") + + dag = dag_maker.dag + sync_dag_to_db(dag) + DagVersion.get_latest_version(dag.dag_id) + + # Create Dag run that is guaranteed to time out + dr = dag_maker.create_dagrun( + start_date=timezone.utcnow() - timedelta(days=1), + state=DagRunState.RUNNING, + ) + + # Create asset event and attach to dag run + asset_event = AssetEvent( + asset_id=asset_model.id, + source_task_id="upstream_task", + source_dag_id="upstream_dag", + source_run_id="upstream_run", + source_map_index=-1, + ) + session.add(asset_event) + session.flush() + dr.consumed_asset_events.append(asset_event) + session.add(dr) + session.flush() + + executor = MockExecutor(do_update=False) + scheduler_job = Job(executor=executor) + self.job_runner = SchedulerJobRunner(scheduler_job) + + ti1 = dr.get_task_instance("dummy_task") + if TYPE_CHECKING: + assert isinstance(ti1, TaskInstance) + ti1.state = State.FAILED + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.FAILED, None + + callback = self.job_runner._schedule_dag_run(dr, session) + session.flush() + + assert callback is not None + assert callback.is_failure_callback + assert callback.msg == "timed_out" + + context = callback.context_from_server + assert context is not None + + if TYPE_CHECKING: + assert isinstance(context.dag_run, DagRun) + events = context.dag_run.consumed_asset_events + assert len(events) == 1 + assert events[0].asset is not None + assert events[0].source_aliases is not None + def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker): dag_id = "SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute" task_id_1 = "dummy_task" From f62d28ef7aa8e1860e628926f757f4d84f3df312 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 23 Dec 2025 17:05:20 +0800 Subject: [PATCH 2/2] fixup! [v3-1-test] fix(scheduler): Eager-load DagRun asset relationships before creating DagRunContext (#59714) (cherry picked from commit 94517f8a62ba86ce0433a1a8d40f62aadb90c2b0) --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 227327aec879a..e30167b583b43 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2002,9 +2002,11 @@ def _schedule_dag_run( ) dag_run.notify_dagrun_state_changed() - duration = dag_run.end_date - dag_run.start_date - Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration) - Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id}) + duration: timedelta | None = None + if dag_run.end_date and dag_run.start_date: + duration = dag_run.end_date - dag_run.start_date + Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration) + Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id}) span.set_attribute("error", True) if span.is_recording(): span.add_event(