diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index c0dce3861eca2..17d03cb2758c1 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -164,9 +164,7 @@ def register_asset_change( session.add(asset_event) session.flush() # Ensure the event is written earlier than DDRQ entries below. - dags_to_queue_from_asset = { - ref.dag for ref in asset_model.scheduled_dags if not ref.dag.is_stale and not ref.dag.is_paused - } + dags_to_queue_from_asset = {ref.dag for ref in asset_model.scheduled_dags if not ref.dag.is_paused} dags_to_queue_from_asset_alias = set() if source_alias_names: @@ -185,7 +183,7 @@ def register_asset_change( dags_to_queue_from_asset_alias |= { alias_ref.dag for alias_ref in asset_alias_model.scheduled_dags - if not alias_ref.dag.is_stale and not alias_ref.dag.is_paused + if not alias_ref.dag.is_paused } dags_to_queue_from_asset_ref = set( @@ -197,7 +195,8 @@ def register_asset_change( or_( DagScheduleAssetNameReference.name == asset.name, DagScheduleAssetUriReference.uri == asset.uri, - ) + ), + DagModel.is_paused.is_(False), ) ) ) diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 94b63e72501fd..330522cbda12d 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -21,7 +21,7 @@ from unittest import mock import pytest -from sqlalchemy import delete +from sqlalchemy import delete, func, select from sqlalchemy.orm import Session from airflow.assets.manager import AssetManager @@ -204,3 +204,41 @@ def test_create_assets_notifies_asset_listener(self, session): assert len(asset_listener.created) == 1 assert len(asms) == 1 assert asset_listener.created[0].uri == asset.uri == asms[0].uri + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_register_asset_change_queues_stale_dag(self, session, mock_task_instance): + asset_manager = AssetManager() + asset_listener.clear() + bundle_name = "testing" + + # Setup an Asset + asset_uri = "test://stale_asset/" + asset_name = "test_stale_asset" + asset_definition = Asset(uri=asset_uri, name=asset_name) + + asm = AssetModel(uri=asset_uri, name=asset_name, group="asset") + session.add(asm) + + # Setup a Dag that is STALE but NOT PAUSED + # We want stale Dags to still receive asset updates + stale_dag = DagModel(dag_id="stale_dag", is_stale=True, is_paused=False, bundle_name=bundle_name) + session.add(stale_dag) + + # Link the Stale Dag to the Asset + asm.scheduled_dags = [DagScheduleAssetReference(dag_id=stale_dag.dag_id)] + + session.execute(delete(AssetDagRunQueue)) + session.flush() + + # Register the asset change + asset_manager.register_asset_change( + task_instance=mock_task_instance, asset=asset_definition, session=session + ) + session.flush() + + # Verify the stale Dag was NOT ignored + assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 1 + + queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id)) + assert queued_id == "stale_dag" + asset_listener.clear() diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 10f58785e14c7..aba109f9d410a 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -4626,7 +4626,10 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, ) session.flush() assert session.scalars(ase_q).one().source_run_id == dr1.run_id - assert session.scalars(adrq_q).one_or_none() is None + if "is_stale" in disable: + assert session.scalars(adrq_q).one_or_none() is not None + else: + assert session.scalars(adrq_q).one_or_none() is None # Simulate the consumer DAG being enabled. session.execute(update(DagModel).where(DagModel.dag_id == "consumer").values(**enable)) @@ -4640,6 +4643,7 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, ) session.flush() assert [e.source_run_id for e in session.scalars(ase_q)] == [dr1.run_id, dr2.run_id] + assert len(session.scalars(adrq_q).all()) == 1 assert session.scalars(adrq_q).one().target_dag_id == "consumer" @time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), tick=False)