-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
respect "soft_fail" argument when running BatchSensor in deferrable mode #33405
respect "soft_fail" argument when running BatchSensor in deferrable mode #33405
Conversation
76a2266
to
332ec7d
Compare
9ff1e26
to
93c79dc
Compare
21a99f6
to
c747d20
Compare
def raise_failed_or_skiping_exception( | ||
*, soft_fail: bool = False, failed_message: str, skipping_message: str = "" | ||
) -> None: | ||
"""Raise AirflowSkipException if self.soft_fail is set to True. Otherwise raise AirflowException.""" | ||
if soft_fail: | ||
raise AirflowSkipException(skipping_message or failed_message) | ||
raise AirflowException(failed_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so basically what you are after is to backport a fix we have in 2.7.1
The proper way to do it is:
- Try to import the function from base_sensor if failed on import error then use the local version.
- The local version should be private
_raise_failed_or_skiping_exception
so we can remove it without deprecation cycle. - There should be a note on the function to remove it when min airflow version for provider is >=2.7.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to import the function from base_sensor if failed on import error then use the local version.
This method was removed from base sensor after merging #33424.
The local version should be private _raise_failed_or_skiping_exception so we can remove it without deprecation cycle.
There should be a note on the function to remove it when min airflow version for provider is >=2.7.
a better solution could be removing this method and duplicating this small block in all the operators, it's the most safe solution to avoid breaking changes. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't track the flow of PR around the soft_fail but that is exactly what troubles me.
I can be the one that pick up the todo in future releases when we want to do the cleanup. I do not understand what happens here. What is the old and what is the new.
I trust you guys have clarity on this thus i'm removing my review and you can proceed as you see fit (as I'm not in the details)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically we fixed the issue in a more proper way in base sensor, and all the providers will work fine with Airflow +2.7.1, but to fix this issue with older versions of Airflow, we need to implement a temporary solution in each one until bumping the min Airflow version for these providers to 2.7.1
# TODO: remove this if-else block when min_airflow_version is set to higher than the version that | ||
# changed in https://github.com/apache/airflow/pull/33424 is released |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if-else block are you referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution will work if there is no errors in the trigger (and it is not canceled after a timeout).
Let me check if we can implement something which always work or we should accept a partial solution as it's temporary.
c747d20
to
c3a47fc
Compare
or should we always send a failure event to |
The problem is that in the older versions of Airflow, the failure is handled without executing the task methods. I checked and I didn't find a simple solution. W can accept a partial fix, and if the user want a full fix, he can upgrade to 2.7.1 |
def raise_failed_or_skiping_exception( | ||
*, soft_fail: bool = False, failed_message: str, skipping_message: str = "" | ||
) -> None: | ||
"""Raise AirflowSkipException if self.soft_fail is set to True. Otherwise raise AirflowException.""" | ||
if soft_fail: | ||
raise AirflowSkipException(skipping_message or failed_message) | ||
raise AirflowException(failed_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove this method and add this to each operator:
if self.soft_fail:
raise AirflowSkipException(...)
raise AirflowException(...)
In this way we will be able to remove it later without worrying about breaking changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! I just updated it as suggested
c3a47fc
to
f8b8582
Compare
0b6570f
to
413f3f8
Compare
632299b
to
c72cd7f
Compare
c72cd7f
to
0f3bfb8
Compare
…ensor in deferrable mode
…ils.py and move the fix back to BatchSensor
…e_with_soft_fail expected exception
0f3bfb8
to
9eec324
Compare
This is the following PR to #33196 and intends to solve the issue that the soft_fail argument might not work on all sensors.
This pull request addresses the issue of running BatchSensor in deferrable mode with
soft_fail=True
still raises anAirflowException
, which should beAirflowSkipException
. Asraise_failed_or_skiping_exception
won't exist inBaseSensorOperator
before airflow (probably 2.7.1?), I add it toairflow.providers.amazon.aws.sensors
. Not sure whether it's the correct way to do so. If so, I'll continue work on fixing other providers' sensors as well^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.