Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add run_if/skip_if functionality to non-TaskFlow operators #44205

Open
2 tasks done
matthewblock opened this issue Nov 20, 2024 · 4 comments
Open
2 tasks done

Add run_if/skip_if functionality to non-TaskFlow operators #44205

matthewblock opened this issue Nov 20, 2024 · 4 comments
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet

Comments

@matthewblock
Copy link

Description

Currently, the useful run_if and skip_if decorators can only be added to tasks that are decorated by the @task decorator, aka TaskFlow compatible operators. However, many common operators are not compatible with TaskFlow API, e.g. the SqlExecuteQueryOperator. If you try, you will be met with:

skip_if can only be used with task. decorate with @task before @skip_if.

This request is for run_if and skip_if to be added to all operators, not just TaskFlow compatible ones.

Use case/motivation

I want to use run_if and skip_if for an operator that isn't TaskFlow compatible.

Related issues

There is a similar issue that I didn't bookmark to make @task decorator apply to classes (aka Operators), not just functions. Depending on how that's implemented, it could solve this issue.

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@matthewblock matthewblock added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Nov 20, 2024
@dosubot dosubot bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Nov 20, 2024
@matthewblock
Copy link
Author

I have never submitted a PR, but my team wrote some working code to add this to our custom operator so I want to try to contribute this functionality.

My initial thought was this could be added as arguments to BaseOperator:

run_if_condition: None | AnyConditionFunc = None,
run_if_message: None | str,
skip_if_condition: None | AnyConditionFunc = None,
skip_if_message: None | str,

Then a lot of the code in airflow.decorators.condition could move to airflow.models.baseoperator, and the existing run_if and skip_if decorators could be modified to utilize these new arguments.

I am also seeing that in the discussion that led to the run_if and skip_if PR, it's recommended to just use pre_execute for this purpose. A somewhat compelling reason not to is that you can pass a message to run_if and skip_if but not pre_execute. 🤷‍♂️

Any thoughts?

@potiuk
Copy link
Member

potiuk commented Nov 20, 2024

Any thoughts?

Yes. The only way we can do this approved is if you figure how how to run that code ONLY in task - and NOT in scheduler. Running any DAG Author provided code in scheduler is violating basic Airlfow security model https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html - and Airflow Scheduler should never run DAG Author modifiable code.

This is for example why custom scheduler code is done via Plugins - you are not able to create custom scheduler code via DAG - you have to have plugin installed.

I think it's possible to do it in a safe way - plugging it in in the "pre_execute" framework and you are welcome to try it. But it will be quite a bit more complex (likely) than what your team came up with. And currently we are only working on Airlfow 3 where the task execution is anyhow heavily changed. But after we release Airflow 3, I think that might be a good addition.

@matthewblock
Copy link
Author

matthewblock commented Nov 21, 2024

I think it's possible to do it in a safe way - plugging it in in the "pre_execute" framework and you are welcome to try it.

Yes - We did exactly this, but in a custom operator instead of BaseOperator - We took the inner workings of run_if and skip_if related functions, minus the logic related to Python decorator itself, to add the specified callable to pre_execute. I'm seeing already how it is more difficult to modify BaseOperator!

A somewhat compelling reason not to is that you can pass a message to run_if and skip_if but not pre_execute. 🤷‍♂️

Another compelling reason is if you have a run_if or skip_if function you want to re-use in two separate tasks, where one supports TaskFlow and the other doesn't. You can use pre_execute in both cases, but you need to raise AirflowSkipException in your function rather than having it return True or False and relying on run_if/skip_if to raise the exception.

@potiuk
Copy link
Member

potiuk commented Nov 21, 2024

I'm seeing already how it is more difficult to modify BaseOperator!

Yes. it is. Because it is an underpinning for a LOT of things. Say mapped operators.

Another compelling reason is if you have a run_if or skip_if function you want to re-use in two separate tasks, where one supports TaskFlow and the other doesn't. You can use pre_execute in both cases, but you need to raise AirflowSkipException in your function rather than having it return True or False and relying on run_if/skip_if to raise the exception.

This could be also done by extracting common code and havin taskflow_run_if and "classic_run_if` wrappers. But yes it would have been easier if it could be only one.

So yes - that's quite possible to add it - maybe even you can attempt to do it now - but again, be aware it's only going to be available in Airflow3 as we do not add any more features to Airlfow 2 (but that's fine as well - one more good reason to migrate to Airflow 3).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

2 participants