Skip to content

Conversation

@eladkal
Copy link
Contributor

@eladkal eladkal commented Jul 31, 2025

This trigger rule is useful for cases where we must verify all upstream is done with execution but we only need one of them to be successful.

Verified also with example dag:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowSkipException
from airflow.sdk.definitions.edges import EdgeModifier

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 2, 1)
}

def always_skip_function():
    raise AirflowSkipException("This task is intentionally skipped.")

with DAG('1min_file', schedule=None, catchup=True, default_args=default_args):

    # Test case 1: One success, one failure -> Expected state: None
    a_success = EmptyOperator(task_id="a_success")
    a_fails = BashOperator(
        task_id='a_fails',
        bash_command='exit 1',  # A non-zero exit code indicates failure
    )
    rule_1 = EmptyOperator(task_id="rule_1", trigger_rule=TriggerRule.ALL_DONE_MIN_ONE_SUCCESS)

    a_success >> EdgeModifier("expect rule_1 to be successful") >> rule_1
    a_fails >> EdgeModifier("expect rule_1 to be successful") >> rule_1

    # Test case 2: Two successes -> Expected state: None
    b_success = EmptyOperator(task_id="b_success")
    b_success2 = EmptyOperator(task_id="b_success2")
    rule_2 = EmptyOperator(task_id="rule_2", trigger_rule=TriggerRule.ALL_DONE_MIN_ONE_SUCCESS)

    b_success >> EdgeModifier("expect rule_2 to be successful") >>  rule_2
    b_success2 >> EdgeModifier("expect rule_2 to be successful") >> rule_2

    # Test case 3: One success, one skip -> Expected state: SKIPPED
    c_success = EmptyOperator(task_id="c_success")
    c_skip = PythonOperator(
        task_id='c_skip',
        python_callable=always_skip_function,
    )
    rule_3 = EmptyOperator(task_id="rule_3", trigger_rule=TriggerRule.ALL_DONE_MIN_ONE_SUCCESS)

    c_success  >> EdgeModifier("expect rule_3 to be skipped") >> rule_3
    c_skip >> EdgeModifier("expect rule_3 to be skipped") >>  rule_3

    # Test case 4: All failures -> Expected state: UPSTREAM_FAILED
    d_fails1 = BashOperator(
        task_id='d_fails1',
        bash_command='exit 1',
    )
    d_fails2 = BashOperator(
        task_id='d_fails2',
        bash_command='exit 1',
    )
    rule_4 = EmptyOperator(task_id="rule_4", trigger_rule=TriggerRule.ALL_DONE_MIN_ONE_SUCCESS)

    d_fails1 >> EdgeModifier("expect rule_4 to be upstream failed") >> rule_4
    d_fails2 >> EdgeModifier("expect rule_4 to be upstream failed") >> rule_4


    # Test case 5: upstream failure cascade -> Expected state: UPSTREAM_FAILED
    e_fails3 = BashOperator(
        task_id='d_fails3',
        bash_command='exit 1',
    )
    e_upstream_failed = EmptyOperator(task_id="e_upstream")

    rule_5 = EmptyOperator(task_id="rule_5", trigger_rule=TriggerRule.ALL_DONE_MIN_ONE_SUCCESS)

    e_fails3 >> e_upstream_failed >> EdgeModifier("expect rule_5 to be upstream failed") >> rule_5
Screenshot 2025-07-31 at 11 14 51

@eladkal eladkal added this to the Airflow 3.1.0 milestone Jul 31, 2025
@eladkal eladkal added the type:new-feature Changelog: New Features label Jul 31, 2025
Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely against it, but I think we could use something like a branch operator with all_done. -0 to this, but would love to hear how everyone think 🙂

@eladkal
Copy link
Contributor Author

eladkal commented Jul 31, 2025

I'm not completely against it, but I think we could use something like a branch operator with all_done. -0 to this, but would love to hear how everyone think 🙂

There is no branching here. I don't know of any simple way to get the desired behavior. you need to workaround with additional tasks which in big environments creates many unnecessary tasks (more load on the scheduler etc..)

Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM - I could imagine use cases of this trigger rule.
For the long term, maybe we should think about changing the trigger rule mechanism so the definition could be more flexible instead of the hardcoded enum (e.g., this TR will be represented as something like trigger_rule={'done': 'all', 'success': '>=1'}).

@eladkal
Copy link
Contributor Author

eladkal commented Aug 1, 2025

Overall LGTM - I could imagine use cases of this trigger rule. For the long term, maybe we should think about changing the trigger rule mechanism so the definition could be more flexible instead of the hardcoded enum (e.g., this TR will be represented as something like trigger_rule={'done': 'all', 'success': '>=1'}).

We can look into it after Airflow 3.1 if you have a proposal feel free to suggest it in the mailing list.
I think this can be a good idea.

@eladkal eladkal merged commit 24c5b91 into apache:main Aug 1, 2025
195 of 196 checks passed
@eladkal eladkal deleted the min_one branch August 1, 2025 05:14
@Lee-W
Copy link
Member

Lee-W commented Aug 1, 2025

Yep, what's in my mind is customizable trigger rules as well. @shahar1 let's see whether we can work on something together after 3.1 🙂

@potiuk
Copy link
Member

potiuk commented Aug 1, 2025

Yep, what's in my mind is customizable trigger rules as well. @shahar1 let's see whether we can work on something together after 3.1 🙂

We should be careful with that. Trigger rules are on the hottest path of scheduler, and we should not have too many of them, also customizability of it is limited as it runs in scheduler. We should generally try to minimise and rather hard-code all the logic here I think,

ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
fweilun pushed a commit to fweilun/airflow that referenced this pull request Aug 11, 2025
@ManelCoutinhoSensei
Copy link

Hey @eladkal,
I know I'm little bit late, but can you please explain / justify the final rule (if one upstream is skipped, so will be the downstream)?
I ask this because the name of this rule seems derived from the ALL_DONE, yet the behavior is different from what a user familiar with ALL_DONE would expect. Given that we already have a rule like NONE_FAILED_MIN_ONE_SUCCESS (where the transition from NONE_FAILED to NONE_FAILED_MIN_ONE_SUCCESS matches intuitive expectations), I wonder whether the naming and behavioral semantics are sufficiently justified.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants