Skip to content

Commit

Permalink
test(taskinstance): add test case test_outlet_asset_alias_asset_inactive
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Dec 16, 2024
1 parent b86dc2d commit d686a1d
Showing 1 changed file with 59 additions and 6 deletions.
65 changes: 59 additions & 6 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from airflow.exceptions import (
AirflowException,
AirflowFailException,
AirflowInactiveAssetAddedToAssetAliasException,
AirflowInactiveAssetInInletOrOutletException,
AirflowRescheduleException,
AirflowSensorTimeout,
Expand Down Expand Up @@ -2728,6 +2729,58 @@ def producer(*, outlet_events):
assert len(asset_alias_obj.assets) == 1
assert asset_alias_obj.assets[0].uri == asset_uri

def test_outlet_asset_alias_asset_inactive(self, dag_maker, session):
from airflow.sdk.definitions.asset import Asset, AssetAlias

asset_name = "did_not_exists"
asset = Asset(asset_name)
asset2 = Asset(asset_name, uri="test://asset")
asm = AssetModel.from_public(asset)
asm2 = AssetModel.from_public(asset2)
session.add_all([asm, asm2, AssetActive.for_asset(asm)])

asset_alias_name = "alias_with_inactive_asset"

with dag_maker(dag_id="producer_dag", schedule=None, session=session):

@task(outlets=AssetAlias(asset_alias_name))
def producer_without_inactive(*, outlet_events):
outlet_events[AssetAlias(asset_alias_name)].add(asset, extra={"key": "value"})

@task(outlets=AssetAlias(asset_alias_name))
def producer_with_inactive(*, outlet_events):
outlet_events[AssetAlias(asset_alias_name)].add(asset2, extra={"key": "value"})

producer_without_inactive() >> producer_with_inactive()

tis = {ti.task_id: ti for ti in dag_maker.create_dagrun().task_instances}
tis["producer_without_inactive"].run(session=session)
with pytest.raises(AirflowInactiveAssetAddedToAssetAliasException) as exc:
tis["producer_with_inactive"].run(session=session)

assert 'Asset(name="did_not_exists", uri="test://asset/")' in str(exc.value)

producer_event = session.scalar(
select(AssetEvent).where(AssetEvent.source_task_id == "producer_without_inactive")
)

assert producer_event.source_task_id == "producer_without_inactive"
assert producer_event.source_dag_id == "producer_dag"
assert producer_event.source_run_id == "test"
assert producer_event.source_map_index == -1
assert producer_event.asset.uri == asset_name
assert producer_event.extra == {"key": "value"}
assert len(producer_event.source_aliases) == 1
assert producer_event.source_aliases[0].name == asset_alias_name

asset_obj = session.scalar(select(AssetModel).where(AssetModel.uri == asset_name))
assert len(asset_obj.aliases) == 1
assert asset_obj.aliases[0].name == asset_alias_name

asset_alias_obj = session.scalar(select(AssetAliasModel))
assert len(asset_alias_obj.assets) == 1
assert asset_alias_obj.assets[0].uri == asset_name

@pytest.mark.want_activate_assets(True)
def test_inlet_asset_extra(self, dag_maker, session):
from airflow.sdk.definitions.asset import Asset
Expand Down Expand Up @@ -4081,8 +4134,8 @@ def duplicate_asset_task_in_outlet(*, outlet_events):
with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc:
tis["duplicate_asset_task_in_outlet"].run(session=session)

assert 'Asset(name="asset_second", uri="asset_second")' in exc.value.args[0]
assert 'Asset(name="asset_first", uri="test://asset/")' in exc.value.args[0]
assert 'Asset(name="asset_second", uri="asset_second")' in str(exc.value)
assert 'Asset(name="asset_first", uri="test://asset/")' in str(exc.value)

@pytest.mark.want_activate_assets(True)
def test_run_with_inactive_assets_in_outlets_within_the_same_dag(self, dag_maker, session):
Expand All @@ -4105,7 +4158,7 @@ def duplicate_asset_task(*, outlet_events):
with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc:
tis["duplicate_asset_task"].run(session=session)

assert exc.value.args[0] == (
assert str(exc.value) == (
"Task has the following inactive assets in its inlets or outlets: "
'Asset(name="asset_first", uri="test://asset/")'
)
Expand Down Expand Up @@ -4134,7 +4187,7 @@ def duplicate_asset_task(*, outlet_events):
with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc:
tis["duplicate_asset_task"].run(session=session)

assert exc.value.args[0] == (
assert str(exc.value) == (
"Task has the following inactive assets in its inlets or outlets: "
'Asset(name="asset_first", uri="test://asset/")'
)
Expand All @@ -4159,7 +4212,7 @@ def duplicate_asset_task():
with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc:
tis["first_asset_task"].run(session=session)

assert exc.value.args[0] == (
assert str(exc.value) == (
"Task has the following inactive assets in its inlets or outlets: "
'Asset(name="asset_first", uri="asset_first")'
)
Expand Down Expand Up @@ -4188,7 +4241,7 @@ def duplicate_asset_task(*, outlet_events):
with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc:
tis["duplicate_asset_task"].run(session=session)

assert exc.value.args[0] == (
assert str(exc.value) == (
"Task has the following inactive assets in its inlets or outlets: "
'Asset(name="asset_first", uri="test://asset/")'
)
Expand Down

0 comments on commit d686a1d

Please sign in to comment.