From d686a1db58b6604a25dfff489e4b52b1591d52ff Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 16 Dec 2024 17:33:38 +0900 Subject: [PATCH] test(taskinstance): add test case test_outlet_asset_alias_asset_inactive --- tests/models/test_taskinstance.py | 65 ++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 1aa1b282c7744..1c64a66e7a813 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -43,6 +43,7 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, + AirflowInactiveAssetAddedToAssetAliasException, AirflowInactiveAssetInInletOrOutletException, AirflowRescheduleException, AirflowSensorTimeout, @@ -2728,6 +2729,58 @@ def producer(*, outlet_events): assert len(asset_alias_obj.assets) == 1 assert asset_alias_obj.assets[0].uri == asset_uri + def test_outlet_asset_alias_asset_inactive(self, dag_maker, session): + from airflow.sdk.definitions.asset import Asset, AssetAlias + + asset_name = "did_not_exists" + asset = Asset(asset_name) + asset2 = Asset(asset_name, uri="test://asset") + asm = AssetModel.from_public(asset) + asm2 = AssetModel.from_public(asset2) + session.add_all([asm, asm2, AssetActive.for_asset(asm)]) + + asset_alias_name = "alias_with_inactive_asset" + + with dag_maker(dag_id="producer_dag", schedule=None, session=session): + + @task(outlets=AssetAlias(asset_alias_name)) + def producer_without_inactive(*, outlet_events): + outlet_events[AssetAlias(asset_alias_name)].add(asset, extra={"key": "value"}) + + @task(outlets=AssetAlias(asset_alias_name)) + def producer_with_inactive(*, outlet_events): + outlet_events[AssetAlias(asset_alias_name)].add(asset2, extra={"key": "value"}) + + producer_without_inactive() >> producer_with_inactive() + + tis = {ti.task_id: ti for ti in dag_maker.create_dagrun().task_instances} + tis["producer_without_inactive"].run(session=session) + with pytest.raises(AirflowInactiveAssetAddedToAssetAliasException) as exc: + tis["producer_with_inactive"].run(session=session) + + assert 'Asset(name="did_not_exists", uri="test://asset/")' in str(exc.value) + + producer_event = session.scalar( + select(AssetEvent).where(AssetEvent.source_task_id == "producer_without_inactive") + ) + + assert producer_event.source_task_id == "producer_without_inactive" + assert producer_event.source_dag_id == "producer_dag" + assert producer_event.source_run_id == "test" + assert producer_event.source_map_index == -1 + assert producer_event.asset.uri == asset_name + assert producer_event.extra == {"key": "value"} + assert len(producer_event.source_aliases) == 1 + assert producer_event.source_aliases[0].name == asset_alias_name + + asset_obj = session.scalar(select(AssetModel).where(AssetModel.uri == asset_name)) + assert len(asset_obj.aliases) == 1 + assert asset_obj.aliases[0].name == asset_alias_name + + asset_alias_obj = session.scalar(select(AssetAliasModel)) + assert len(asset_alias_obj.assets) == 1 + assert asset_alias_obj.assets[0].uri == asset_name + @pytest.mark.want_activate_assets(True) def test_inlet_asset_extra(self, dag_maker, session): from airflow.sdk.definitions.asset import Asset @@ -4081,8 +4134,8 @@ def duplicate_asset_task_in_outlet(*, outlet_events): with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc: tis["duplicate_asset_task_in_outlet"].run(session=session) - assert 'Asset(name="asset_second", uri="asset_second")' in exc.value.args[0] - assert 'Asset(name="asset_first", uri="test://asset/")' in exc.value.args[0] + assert 'Asset(name="asset_second", uri="asset_second")' in str(exc.value) + assert 'Asset(name="asset_first", uri="test://asset/")' in str(exc.value) @pytest.mark.want_activate_assets(True) def test_run_with_inactive_assets_in_outlets_within_the_same_dag(self, dag_maker, session): @@ -4105,7 +4158,7 @@ def duplicate_asset_task(*, outlet_events): with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc: tis["duplicate_asset_task"].run(session=session) - assert exc.value.args[0] == ( + assert str(exc.value) == ( "Task has the following inactive assets in its inlets or outlets: " 'Asset(name="asset_first", uri="test://asset/")' ) @@ -4134,7 +4187,7 @@ def duplicate_asset_task(*, outlet_events): with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc: tis["duplicate_asset_task"].run(session=session) - assert exc.value.args[0] == ( + assert str(exc.value) == ( "Task has the following inactive assets in its inlets or outlets: " 'Asset(name="asset_first", uri="test://asset/")' ) @@ -4159,7 +4212,7 @@ def duplicate_asset_task(): with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc: tis["first_asset_task"].run(session=session) - assert exc.value.args[0] == ( + assert str(exc.value) == ( "Task has the following inactive assets in its inlets or outlets: " 'Asset(name="asset_first", uri="asset_first")' ) @@ -4188,7 +4241,7 @@ def duplicate_asset_task(*, outlet_events): with pytest.raises(AirflowInactiveAssetInInletOrOutletException) as exc: tis["duplicate_asset_task"].run(session=session) - assert exc.value.args[0] == ( + assert str(exc.value) == ( "Task has the following inactive assets in its inlets or outlets: " 'Asset(name="asset_first", uri="test://asset/")' )