diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index cc2968c8d8213..10c511e8bb4bf 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -61,6 +61,7 @@ AssetModel, DagScheduleAssetAliasReference, DagScheduleAssetReference, + TaskInletAssetReference, TaskOutletAssetReference, asset_trigger_association_table, ) @@ -2554,20 +2555,26 @@ def _update_asset_orphanage(self, session: Session = NEW_SESSION) -> None: """ Check assets orphanization and update their active entry. - An orphaned asset is no longer referenced in any DAG schedule parameters - or task outlets. Active assets (non-orphaned) have entries in AssetActive - and must have unique names and URIs. + An orphaned asset is no longer referenced in any DAG schedule parameters, + task outlets, or task inlets. Active assets (non-orphaned) have entries in + AssetActive and must have unique names and URIs. :seealso: :meth:`AssetModelOperation.activate_assets_if_possible`. """ # Group assets into orphaned=True and orphaned=False groups. orphaned = ( - (func.count(DagScheduleAssetReference.dag_id) + func.count(TaskOutletAssetReference.dag_id)) == 0 + ( + func.count(DagScheduleAssetReference.dag_id) + + func.count(TaskOutletAssetReference.dag_id) + + func.count(TaskInletAssetReference.dag_id) + ) + == 0 ).label("orphaned") asset_reference_query = session.execute( select(orphaned, AssetModel) .outerjoin(DagScheduleAssetReference) .outerjoin(TaskOutletAssetReference) + .outerjoin(TaskInletAssetReference) .group_by(AssetModel.id) .order_by(orphaned) ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 1ad6ffb9984c8..7f815ccea7353 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -6726,18 +6726,22 @@ def test_asset_orphaning(self, dag_maker, session): asset3 = Asset(uri="test://asset_3", name="test_asset_3", group="test_group") asset4 = Asset(uri="test://asset_4", name="test_asset_4", group="test_group") asset5 = Asset(uri="test://asset_5", name="test_asset_5", group="test_group") + asset6 = Asset(uri="test://asset_5", name="test_asset_5", group="test_group") with dag_maker(dag_id="assets-1", schedule=[asset1, asset2], session=session): - BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset4]) + BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset4], inlets=[asset6]) # asset5 is not registered (since it's not used anywhere). orphaned, active = self._find_assets_activation(session) - assert active == [asset1, asset2, asset3, asset4] + assert active == [asset1, asset2, asset3, asset4, asset6] assert orphaned == [] self.job_runner._update_asset_orphanage(session=session) session.flush() + assert active == [asset1, asset2, asset3, asset4, asset6] + assert orphaned == [] + # Now remove 2 asset references and add asset5. with dag_maker(dag_id="assets-1", schedule=[asset1], session=session): BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset5]) diff --git a/devel-common/src/tests_common/test_utils/db.py b/devel-common/src/tests_common/test_utils/db.py index 4c40cc74d0037..63b9d93d3b02f 100644 --- a/devel-common/src/tests_common/test_utils/db.py +++ b/devel-common/src/tests_common/test_utils/db.py @@ -215,6 +215,10 @@ def clear_db_assets(): session.query(AssetDagRunQueue).delete() session.query(DagScheduleAssetReference).delete() session.query(TaskOutletAssetReference).delete() + if AIRFLOW_V_3_1_PLUS: + from airflow.models.asset import TaskInletAssetReference + + session.query(TaskInletAssetReference).delete() from tests_common.test_utils.compat import AssetAliasModel, DagScheduleAssetAliasReference session.query(AssetAliasModel).delete()