Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail a task if an inlet or and outlet asset is inactive #44831

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented Dec 11, 2024

Why this change

In AIP-74, we introduce a new "name" column to the Asset. We do not expect to have assets with the same name or URI. For example, Asset(name="test-name", uri="test://asset") and Asset("test-name") would be considered assets that violate this rule.

Similar to how Dataset works, an asset can be declared by instantiating an Asset object. One difference from Dataset is that Asset will have an additional name argument. If given, this must be unique in the Airflow deployment, and is used to represent the asset in user-facing interfaces,

Currently, we do nothing when a task has an unexpected asset as an inlet or outlet. The following example works fine now, even though it should not.

from __future__ import annotations


from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset


@dag(
    start_date=None,
    schedule=None,
    catchup=False,
)
def my_producer_dag():
    @task(
    	inlets=[
    		# name="test", uri=""s3://my-bucket/my-key/"
    		Asset("test", "s3://my-bucket/my-key/")
    	],
    	outlets=[
    		# name="s3://my-bucket/my-key/", uri=""s3://my-bucket/my-key/"
    		Asset("s3://my-bucket/my-key/") 
    	]
    )
    def my_producer_task():
        pass

    my_producer_task()


my_producer_dag()

Back to #42612, we introduced the concept of AssetActive. These assets that are classified as inactive. While the first asset can still be activated, subsequent assets will not be activated. For example, if Asset("test", "s3://my-bucket/my-key/") is activated first, then Asset("s3://my-bucket/my-key/") will not be activated. This rule applies even if the assets are defined in different DAGs or in separate files.

What's the change

The activation status of an asset has already been addressed in previous pull requests. In this pull request, we have implemented a task that checks for any inactive assets in the outlets or inlets. If any inactive assets are found, it raises an AirflowExecuteWithInactiveAssetException, which inherits from AirflowFailException, causing the task to fail.

Closes: #44600

Test

Tested with the following test cases. All of them worked fine before this fix and received a similar error message to the screenshot below after this fix.

Inactive assets in inlets
from __future__ import annotations

from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset


@dag(start_date=None, schedule=None, catchup=False)
def inactive_assets_in_inlets_dag():
    @task(
        inlets=[
            Asset("inlet-test", "s3://inlet/my-key/"),
            Asset("s3://inlet/my-key/"),
            Asset("inlet-test2"),
        ],
    )
    def first_asset_task():
        pass

    @task(inlets=[Asset(uri="inlet-test2")])
    def second_asset_task():
        pass

    first_asset_task() >> second_asset_task()


inactive_assets_in_inlets_dag()
Inactive assets in outlets
from __future__ import annotations

from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset


@dag(start_date=None, schedule=None, catchup=False)
def inactive_assets_in_outlets_dag():
    @task(
        outlets=[
            Asset("outlet-test", "s3://outlet/my-key/"),
            Asset("s3://outlet/my-key/"),
            Asset("outlet-test-2"),
        ],
    )
    def first_asset_task():
        pass

    @task(outlets=[Asset(uri="outlet-test-2")])
    def second_asset_task():
        pass

    first_asset_task() >> second_asset_task()


inactive_assets_in_outlets_dag()
Inactive assets mixed in inlets and outlets
from __future__ import annotations

from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset


@dag(start_date=None, schedule=None, catchup=False)
def inactive_assets_mixed_in_inlets_outlet_dag():
    @task(
        inlets=[
            Asset("test", "s3://my-bucket/my-key/"),
        ],
        outlets=[Asset("test2")],
    )
    def first_asset_task():
        pass

    @task(
        inlets=[
            Asset(uri="test2"),
        ],
        outlets=[
            Asset("s3://my-bucket/my-key/"),
        ],
    )
    def second_asset_task():
        pass

    first_asset_task() >> second_asset_task()


inactive_assets_mixed_in_inlets_outlet_dag()
Inactive assets cross files
from __future__ import annotations

from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset


@dag(start_date=None, schedule=None, catchup=False)
def inactive_active_cross_file_dag_1():
    @task(inlets=[Asset("cross", "s3://cross/my-key/")])
    def first_asset_task():
        pass

    @task(outlets=[Asset("cross2", "s3://cross2/my-key/")])
    def second_asset_task():
        pass

    first_asset_task() >> second_asset_task()


inactive_active_cross_file_dag_1()
from __future__ import annotations

from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset


@dag(start_date=None, schedule=None, catchup=False)
def inactive_active_cross_file_dag_2():
    @task(inlets=[Asset("s3://cross2/my-key/")])
    def first_asset_task():
        pass

    @task(outlets=[Asset("cross")])
    def second_asset_task():
        pass

    first_asset_task() >> second_asset_task()


inactive_active_cross_file_dag_2()

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Lee-W Lee-W force-pushed the fail-a-task-if-inlet-outlet-asset-is-inactive branch from fc63134 to 5064159 Compare December 11, 2024 03:59
@Lee-W
Copy link
Member Author

Lee-W commented Dec 11, 2024

2 test cases still need to be fixed but the overall implementation is wrapped up

@Lee-W Lee-W force-pushed the fail-a-task-if-inlet-outlet-asset-is-inactive branch 8 times, most recently from b93851a to e75afd4 Compare December 12, 2024 08:06
@Lee-W Lee-W marked this pull request as ready for review December 12, 2024 08:07
@Lee-W Lee-W requested review from XD-DENG and ashb as code owners December 12, 2024 08:07
@Lee-W Lee-W changed the title Fail a task if inlet outlet asset is inactive Fail a task if an inlet or and outlet asset is inactive Dec 12, 2024
@Lee-W Lee-W force-pushed the fail-a-task-if-inlet-outlet-asset-is-inactive branch from e75afd4 to 71eb6f4 Compare December 12, 2024 09:03
@Lee-W Lee-W requested a review from uranusjr December 12, 2024 09:07
@Lee-W Lee-W force-pushed the fail-a-task-if-inlet-outlet-asset-is-inactive branch from 71eb6f4 to 78fc85c Compare December 12, 2024 23:53
@uranusjr
Copy link
Member

Should we also fail if an inactive asset is added by the user to an asset alias? I feel we should for consistency. If that’s the case, this should better be done after the task is run (during the event-pushing phase) instead.

@Lee-W
Copy link
Member Author

Lee-W commented Dec 13, 2024

Should we also fail if an inactive asset is added by the user to an asset alias? I feel we should for consistency.

Yes, this is also missed.

If that’s the case, this should better be done after the task is run (during the event-pushing phase) instead.

I'm not sure 🤔 If we already know the task should fail, I think we should fail it at earlier stage (i.e., inlets and outlets cases). If this is a super resource consumsing task, this helps saving resource. But for the asset alias case, this is not something we can handle before task execution, we should do it after task execution.

@Lee-W Lee-W force-pushed the fail-a-task-if-inlet-outlet-asset-is-inactive branch 4 times, most recently from e0e92bf to d686a1d Compare December 16, 2024 08:33
@Lee-W
Copy link
Member Author

Lee-W commented Dec 16, 2024

Should we also fail if an inactive asset is added by the user to an asset alias? I feel we should for consistency. If that’s the case, this should better be done after the task is run (during the event-pushing phase) instead.

just handled AssetAlias.add

@Lee-W Lee-W force-pushed the fail-a-task-if-inlet-outlet-asset-is-inactive branch from d686a1d to 4082c6a Compare December 16, 2024 10:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fail a task if the outlet is set to an inactive asset
2 participants