Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,14 @@ def _schedule_dag_run(
):
dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))

dag_run = session.scalar(
select(DagRun)
.where(DagRun.id == dag_run.id)
.options(
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset),
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases),
)
)
callback_to_execute = DagCallbackRequest(
filepath=dag_model.relative_fileloc,
dag_id=dag.dag_id,
Expand All @@ -1994,9 +2002,11 @@ def _schedule_dag_run(
)

dag_run.notify_dagrun_state_changed()
duration = dag_run.end_date - dag_run.start_date
Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration)
Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id})
duration: timedelta | None = None
if dag_run.end_date and dag_run.start_date:
duration = dag_run.end_date - dag_run.start_date
Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration)
Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id})
span.set_attribute("error", True)
if span.is_recording():
span.add_event(
Expand Down
78 changes: 78 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from collections.abc import Generator
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
from uuid import uuid4
Expand Down Expand Up @@ -104,6 +105,10 @@
from unit.listeners.test_listeners import get_listener_manager
from unit.models import TEST_DAGS_FOLDER

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from tests_common.pytest_plugin import DagMaker
pytestmark = pytest.mark.db_test

PERF_DAGS_FOLDER = AIRFLOW_ROOT_PATH / "dev" / "airflow_perf" / "dags"
Expand Down Expand Up @@ -669,6 +674,79 @@ def test_process_executor_events_with_asset_events(self, mock_stats_incr, sessio
assert len(callback_request.context_from_server.dag_run.consumed_asset_events) == 1
assert callback_request.context_from_server.dag_run.consumed_asset_events[0].asset.uri == asset1.uri

@pytest.mark.usefixtures("testing_dag_bundle")
def test_schedule_dag_run_with_asset_event(self, session: Session, dag_maker: DagMaker):
"""
Verify that scheduler can build DagRunContext for a timed-out Dag run
with consumed asset events without raising DetachedInstanceError.
"""
asset1 = Asset(uri="test://asset1", name="test_asset_executor", group="test_group")
asset_model = AssetModel(name=asset1.name, uri=asset1.uri, group=asset1.group)
session.add(asset_model)
session.flush()

with dag_maker(
dag_id="test_executor_events_with_assets",
schedule=[asset1],
fileloc="/test_path1/",
dagrun_timeout=timedelta(minutes=1),
):
EmptyOperator(task_id="dummy_task")

dag = dag_maker.dag
sync_dag_to_db(dag)
DagVersion.get_latest_version(dag.dag_id)

# Create Dag run that is guaranteed to time out
dr = dag_maker.create_dagrun(
start_date=timezone.utcnow() - timedelta(days=1),
state=DagRunState.RUNNING,
)

# Create asset event and attach to dag run
asset_event = AssetEvent(
asset_id=asset_model.id,
source_task_id="upstream_task",
source_dag_id="upstream_dag",
source_run_id="upstream_run",
source_map_index=-1,
)
session.add(asset_event)
session.flush()
dr.consumed_asset_events.append(asset_event)
session.add(dr)
session.flush()

executor = MockExecutor(do_update=False)
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)

ti1 = dr.get_task_instance("dummy_task")
if TYPE_CHECKING:
assert isinstance(ti1, TaskInstance)
ti1.state = State.FAILED
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.FAILED, None

callback = self.job_runner._schedule_dag_run(dr, session)
session.flush()

assert callback is not None
assert callback.is_failure_callback
assert callback.msg == "timed_out"

context = callback.context_from_server
assert context is not None

if TYPE_CHECKING:
assert isinstance(context.dag_run, DagRun)
events = context.dag_run.consumed_asset_events
assert len(events) == 1
assert events[0].asset is not None
assert events[0].source_aliases is not None

def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker):
dag_id = "SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute"
task_id_1 = "dummy_task"
Expand Down