Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ def register_asset_change(
session.add(asset_event)
session.flush() # Ensure the event is written earlier than DDRQ entries below.

dags_to_queue_from_asset = {
ref.dag for ref in asset_model.scheduled_dags if not ref.dag.is_stale and not ref.dag.is_paused
}
dags_to_queue_from_asset = {ref.dag for ref in asset_model.scheduled_dags if not ref.dag.is_paused}

dags_to_queue_from_asset_alias = set()
if source_alias_names:
Expand All @@ -185,7 +183,7 @@ def register_asset_change(
dags_to_queue_from_asset_alias |= {
alias_ref.dag
for alias_ref in asset_alias_model.scheduled_dags
if not alias_ref.dag.is_stale and not alias_ref.dag.is_paused
if not alias_ref.dag.is_paused
}

dags_to_queue_from_asset_ref = set(
Expand All @@ -197,7 +195,8 @@ def register_asset_change(
or_(
DagScheduleAssetNameReference.name == asset.name,
DagScheduleAssetUriReference.uri == asset.uri,
)
),
DagModel.is_paused.is_(False),
)
)
)
Expand Down
40 changes: 39 additions & 1 deletion airflow-core/tests/unit/assets/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from unittest import mock

import pytest
from sqlalchemy import delete
from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session

from airflow.assets.manager import AssetManager
Expand Down Expand Up @@ -204,3 +204,41 @@ def test_create_assets_notifies_asset_listener(self, session):
assert len(asset_listener.created) == 1
assert len(asms) == 1
assert asset_listener.created[0].uri == asset.uri == asms[0].uri

@pytest.mark.usefixtures("testing_dag_bundle")
def test_register_asset_change_queues_stale_dag(self, session, mock_task_instance):
asset_manager = AssetManager()
asset_listener.clear()
bundle_name = "testing"

# Setup an Asset
asset_uri = "test://stale_asset/"
asset_name = "test_stale_asset"
asset_definition = Asset(uri=asset_uri, name=asset_name)

asm = AssetModel(uri=asset_uri, name=asset_name, group="asset")
session.add(asm)

# Setup a Dag that is STALE but NOT PAUSED
# We want stale Dags to still receive asset updates
stale_dag = DagModel(dag_id="stale_dag", is_stale=True, is_paused=False, bundle_name=bundle_name)
session.add(stale_dag)

# Link the Stale Dag to the Asset
asm.scheduled_dags = [DagScheduleAssetReference(dag_id=stale_dag.dag_id)]

session.execute(delete(AssetDagRunQueue))
session.flush()

# Register the asset change
asset_manager.register_asset_change(
task_instance=mock_task_instance, asset=asset_definition, session=session
)
session.flush()

# Verify the stale Dag was NOT ignored
assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 1

queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
assert queued_id == "stale_dag"
asset_listener.clear()
6 changes: 5 additions & 1 deletion airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4626,7 +4626,10 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable,
)
session.flush()
assert session.scalars(ase_q).one().source_run_id == dr1.run_id
assert session.scalars(adrq_q).one_or_none() is None
if "is_stale" in disable:
assert session.scalars(adrq_q).one_or_none() is not None
else:
assert session.scalars(adrq_q).one_or_none() is None

# Simulate the consumer DAG being enabled.
session.execute(update(DagModel).where(DagModel.dag_id == "consumer").values(**enable))
Expand All @@ -4640,6 +4643,7 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable,
)
session.flush()
assert [e.source_run_id for e in session.scalars(ase_q)] == [dr1.run_id, dr2.run_id]
assert len(session.scalars(adrq_q).all()) == 1
assert session.scalars(adrq_q).one().target_dag_id == "consumer"

@time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), tick=False)
Expand Down