diff --git a/airflow-core/docs/core-concepts/dags.rst b/airflow-core/docs/core-concepts/dags.rst index 59fc1805ae9d5..bca75697f76c3 100644 --- a/airflow-core/docs/core-concepts/dags.rst +++ b/airflow-core/docs/core-concepts/dags.rst @@ -435,6 +435,7 @@ However, this is just the default behaviour, and you can control it using the `` * ``all_success`` (default): All upstream tasks have succeeded * ``all_failed``: All upstream tasks are in a ``failed`` or ``upstream_failed`` state * ``all_done``: All upstream tasks are done with their execution +* ``all_done_min_one_success``: All non-skipped upstream tasks are done with their execution and at least one upstream task has succeeded * ``all_skipped``: All upstream tasks are in a ``skipped`` state * ``one_failed``: At least one upstream task has failed (does not wait for all upstream tasks to be done) * ``one_success``: At least one upstream task has succeeded (does not wait for all upstream tasks to be done) diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py index 9a0096f569d5c..ee8035367f9b6 100644 --- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py @@ -430,6 +430,17 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]: elif trigger_rule == TR.ALL_SKIPPED: if success or failed or upstream_failed: new_state = TaskInstanceState.SKIPPED + elif trigger_rule == TR.ALL_DONE_MIN_ONE_SUCCESS: + # For this trigger rule, skipped tasks are not considered "done" + non_skipped_done = success + failed + upstream_failed + removed + non_skipped_upstream = upstream - skipped + + if skipped > 0: + # There are skipped tasks, so not all tasks are "done" for this rule + new_state = TaskInstanceState.SKIPPED + elif non_skipped_done >= non_skipped_upstream and success == 0: + # All non-skipped tasks are done but no successes + new_state = TaskInstanceState.UPSTREAM_FAILED elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS: if upstream_done and upstream_setup and skipped_setup >= upstream_setup: # when there is an upstream setup and they have all skipped, then skip @@ -573,6 +584,41 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]: f"upstream_task_ids={task.upstream_task_ids}" ) ) + elif trigger_rule == TR.ALL_DONE_MIN_ONE_SUCCESS: + # For this trigger rule, skipped tasks are not considered "done" + non_skipped_done = success + failed + upstream_failed + removed + non_skipped_upstream = upstream - skipped + if ti.map_index > -1: + non_skipped_upstream -= removed + non_skipped_done -= removed + + if skipped > 0: + yield self._failing_status( + reason=( + f"Task's trigger rule '{trigger_rule}' requires all non-skipped upstream tasks to have " + f"completed, but found {skipped} skipped task(s). " + f"upstream_states={upstream_states}, " + f"upstream_task_ids={task.upstream_task_ids}" + ) + ) + elif non_skipped_done < non_skipped_upstream: + yield self._failing_status( + reason=( + f"Task's trigger rule '{trigger_rule}' requires all non-skipped upstream tasks to have " + f"completed, but found {non_skipped_upstream - non_skipped_done} task(s) that were not done. " + f"upstream_states={upstream_states}, " + f"upstream_task_ids={task.upstream_task_ids}" + ) + ) + elif success == 0: + yield self._failing_status( + reason=( + f"Task's trigger rule '{trigger_rule}' requires all non-skipped upstream tasks to have " + f"completed and at least one upstream task has succeeded, but found " + f"{success} successful task(s). upstream_states={upstream_states}, " + f"upstream_task_ids={task.upstream_task_ids}" + ) + ) else: yield self._failing_status(reason=f"No strategy to evaluate trigger rule '{trigger_rule}'.") diff --git a/airflow-core/src/airflow/utils/trigger_rule.py b/airflow-core/src/airflow/utils/trigger_rule.py index e825e17911848..6a9b7e656c107 100644 --- a/airflow-core/src/airflow/utils/trigger_rule.py +++ b/airflow-core/src/airflow/utils/trigger_rule.py @@ -26,6 +26,7 @@ class TriggerRule(str, Enum): ALL_SUCCESS = "all_success" ALL_FAILED = "all_failed" ALL_DONE = "all_done" + ALL_DONE_MIN_ONE_SUCCESS = "all_done_min_one_success" ALL_DONE_SETUP_SUCCESS = "all_done_setup_success" ONE_SUCCESS = "one_success" ONE_FAILED = "one_failed" diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py index 581fd4f22ea91..5c3006ea96f6e 100644 --- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py +++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py @@ -25,11 +25,11 @@ import pytest -from airflow.models.baseoperator import BaseOperator from airflow.models.dag_version import DagVersion from airflow.models.taskinstance import TaskInstance from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sdk import task, task_group +from airflow.sdk.bases.operator import BaseOperator from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep, _UpstreamTIStates from airflow.utils.state import DagRunState, TaskInstanceState @@ -1037,6 +1037,152 @@ def test_unknown_tr(self, session, get_task_instance, flag_upstream_failed): expected_reason="No strategy to evaluate trigger rule 'Unknown Trigger Rule'.", ) + @pytest.mark.parametrize("flag_upstream_failed, expected_ti_state", [(True, None), (False, None)]) + def test_all_done_min_one_success_with_mixed_success_and_failure( + self, session, get_task_instance, flag_upstream_failed, expected_ti_state + ): + """ + Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with mixed upstream task states. + + When upstream tasks have mixed states (success and failure), the trigger rule + should pass since all non-skipped tasks are done and at least one succeeded. + """ + ti = get_task_instance( + TriggerRule.ALL_DONE_MIN_ONE_SUCCESS, + success=1, + skipped=0, + failed=1, + removed=0, + upstream_failed=0, + done=2, + normal_tasks=["upstream_success_task", "upstream_failure_task"], + ) + _test_trigger_rule( + ti=ti, + session=session, + flag_upstream_failed=flag_upstream_failed, + expected_ti_state=expected_ti_state, + ) + + @pytest.mark.parametrize("flag_upstream_failed, expected_ti_state", [(True, None), (False, None)]) + def test_all_done_min_one_success_with_all_successful_upstreams( + self, session, get_task_instance, flag_upstream_failed, expected_ti_state + ): + """ + Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with all upstream tasks successful. + + When all upstream tasks succeed, the trigger rule should pass as the minimum + requirement of at least one success is satisfied. + """ + ti = get_task_instance( + TriggerRule.ALL_DONE_MIN_ONE_SUCCESS, + success=2, + skipped=0, + failed=0, + removed=0, + upstream_failed=0, + done=2, + normal_tasks=["upstream_success_task_1", "upstream_success_task_2"], + ) + _test_trigger_rule( + ti=ti, + session=session, + flag_upstream_failed=flag_upstream_failed, + expected_ti_state=expected_ti_state, + ) + + @pytest.mark.parametrize("flag_upstream_failed, expected_ti_state", [(True, SKIPPED), (False, None)]) + def test_all_done_min_one_success_with_success_and_skipped_upstream( + self, session, get_task_instance, flag_upstream_failed, expected_ti_state + ): + """ + Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with success and skipped upstream tasks. + + When upstream tasks include both successful and skipped tasks, the trigger rule + should fail because skipped tasks violate the "all non-skipped tasks done" requirement. + The task should be skipped when flag_upstream_failed is True. + """ + ti = get_task_instance( + TriggerRule.ALL_DONE_MIN_ONE_SUCCESS, + success=1, + skipped=1, + failed=0, + removed=0, + upstream_failed=0, + done=2, + normal_tasks=["upstream_success_task", "upstream_skipped_task"], + ) + _test_trigger_rule( + ti=ti, + session=session, + flag_upstream_failed=flag_upstream_failed, + expected_reason="requires all non-skipped upstream tasks to have completed, but found 1 skipped task(s)", + expected_ti_state=expected_ti_state, + ) + + @pytest.mark.parametrize( + "flag_upstream_failed, expected_ti_state", [(True, UPSTREAM_FAILED), (False, None)] + ) + def test_all_done_min_one_success_with_all_failed_upstreams( + self, session, get_task_instance, flag_upstream_failed, expected_ti_state + ): + """ + Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with all upstream tasks failed. + + When all upstream tasks have failed, the trigger rule should fail because + there are no successful tasks to satisfy the "at least one success" requirement. + The task should be marked as UPSTREAM_FAILED when flag_upstream_failed is True. + """ + ti = get_task_instance( + TriggerRule.ALL_DONE_MIN_ONE_SUCCESS, + success=0, + skipped=0, + failed=2, + removed=0, + upstream_failed=0, + done=2, + normal_tasks=["upstream_failed_task_1", "upstream_failed_task_2"], + ) + _test_trigger_rule( + ti=ti, + session=session, + flag_upstream_failed=flag_upstream_failed, + expected_reason="requires all non-skipped upstream tasks to have completed and at least one upstream task has succeeded", + expected_ti_state=expected_ti_state, + ) + + @pytest.mark.parametrize( + "flag_upstream_failed, expected_ti_state", [(True, UPSTREAM_FAILED), (False, None)] + ) + def test_all_done_min_one_success_with_upstream_failed_cascade( + self, session, get_task_instance, flag_upstream_failed, expected_ti_state + ): + """ + Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with upstream failure cascade. + + When upstream tasks are in UPSTREAM_FAILED state (cascaded from earlier failures), + the trigger rule should fail because there are no successful tasks to satisfy + the "at least one success" requirement. The task should be marked as UPSTREAM_FAILED + when flag_upstream_failed is True. + """ + ti = get_task_instance( + TriggerRule.ALL_DONE_MIN_ONE_SUCCESS, + success=0, + skipped=0, + failed=0, + removed=0, + upstream_failed=1, + done=1, + normal_tasks=["upstream_cascaded_failure_task"], + ) + _test_trigger_rule( + ti=ti, + session=session, + flag_upstream_failed=flag_upstream_failed, + expected_reason="requires all non-skipped upstream tasks to have completed and at least one upstream task has succeeded", + expected_ti_state=expected_ti_state, + ) + def test_UpstreamTIStates(self, session, dag_maker): """ this test tests the helper class '_UpstreamTIStates' as a unit and inside update_state diff --git a/airflow-core/tests/unit/utils/test_trigger_rule.py b/airflow-core/tests/unit/utils/test_trigger_rule.py index 727dc40d8ab73..2956ef271f798 100644 --- a/airflow-core/tests/unit/utils/test_trigger_rule.py +++ b/airflow-core/tests/unit/utils/test_trigger_rule.py @@ -36,7 +36,8 @@ def test_valid_trigger_rules(self): assert TriggerRule.is_valid(TriggerRule.ALWAYS) assert TriggerRule.is_valid(TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) assert TriggerRule.is_valid(TriggerRule.ALL_DONE_SETUP_SUCCESS) - assert len(TriggerRule.all_triggers()) == 12 + assert TriggerRule.is_valid(TriggerRule.ALL_DONE_MIN_ONE_SUCCESS) + assert len(TriggerRule.all_triggers()) == 13 with pytest.raises(ValueError): TriggerRule("NOT_EXIST_TRIGGER_RULE")