Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
AssetModel,
DagScheduleAssetAliasReference,
DagScheduleAssetReference,
TaskInletAssetReference,
TaskOutletAssetReference,
asset_trigger_association_table,
)
Expand Down Expand Up @@ -2554,20 +2555,26 @@ def _update_asset_orphanage(self, session: Session = NEW_SESSION) -> None:
"""
Check assets orphanization and update their active entry.

An orphaned asset is no longer referenced in any DAG schedule parameters
or task outlets. Active assets (non-orphaned) have entries in AssetActive
and must have unique names and URIs.
An orphaned asset is no longer referenced in any DAG schedule parameters,
task outlets, or task inlets. Active assets (non-orphaned) have entries in
AssetActive and must have unique names and URIs.

:seealso: :meth:`AssetModelOperation.activate_assets_if_possible`.
"""
# Group assets into orphaned=True and orphaned=False groups.
orphaned = (
(func.count(DagScheduleAssetReference.dag_id) + func.count(TaskOutletAssetReference.dag_id)) == 0
(
func.count(DagScheduleAssetReference.dag_id)
+ func.count(TaskOutletAssetReference.dag_id)
+ func.count(TaskInletAssetReference.dag_id)
)
== 0
).label("orphaned")
asset_reference_query = session.execute(
select(orphaned, AssetModel)
.outerjoin(DagScheduleAssetReference)
.outerjoin(TaskOutletAssetReference)
.outerjoin(TaskInletAssetReference)
.group_by(AssetModel.id)
.order_by(orphaned)
)
Expand Down
8 changes: 6 additions & 2 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6726,18 +6726,22 @@ def test_asset_orphaning(self, dag_maker, session):
asset3 = Asset(uri="test://asset_3", name="test_asset_3", group="test_group")
asset4 = Asset(uri="test://asset_4", name="test_asset_4", group="test_group")
asset5 = Asset(uri="test://asset_5", name="test_asset_5", group="test_group")
asset6 = Asset(uri="test://asset_5", name="test_asset_5", group="test_group")

with dag_maker(dag_id="assets-1", schedule=[asset1, asset2], session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset4])
BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset4], inlets=[asset6])

# asset5 is not registered (since it's not used anywhere).
orphaned, active = self._find_assets_activation(session)
assert active == [asset1, asset2, asset3, asset4]
assert active == [asset1, asset2, asset3, asset4, asset6]
assert orphaned == []

self.job_runner._update_asset_orphanage(session=session)
session.flush()

assert active == [asset1, asset2, asset3, asset4, asset6]
assert orphaned == []

# Now remove 2 asset references and add asset5.
with dag_maker(dag_id="assets-1", schedule=[asset1], session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[asset3, asset5])
Expand Down
4 changes: 4 additions & 0 deletions devel-common/src/tests_common/test_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ def clear_db_assets():
session.query(AssetDagRunQueue).delete()
session.query(DagScheduleAssetReference).delete()
session.query(TaskOutletAssetReference).delete()
if AIRFLOW_V_3_1_PLUS:
from airflow.models.asset import TaskInletAssetReference

session.query(TaskInletAssetReference).delete()
from tests_common.test_utils.compat import AssetAliasModel, DagScheduleAssetAliasReference

session.query(AssetAliasModel).delete()
Expand Down