Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/docs/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ However, this is just the default behaviour, and you can control it using the ``
* ``all_success`` (default): All upstream tasks have succeeded
* ``all_failed``: All upstream tasks are in a ``failed`` or ``upstream_failed`` state
* ``all_done``: All upstream tasks are done with their execution
* ``all_done_min_one_success``: All non-skipped upstream tasks are done with their execution and at least one upstream task has succeeded
* ``all_skipped``: All upstream tasks are in a ``skipped`` state
* ``one_failed``: At least one upstream task has failed (does not wait for all upstream tasks to be done)
* ``one_success``: At least one upstream task has succeeded (does not wait for all upstream tasks to be done)
Expand Down
46 changes: 46 additions & 0 deletions airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,17 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
elif trigger_rule == TR.ALL_SKIPPED:
if success or failed or upstream_failed:
new_state = TaskInstanceState.SKIPPED
elif trigger_rule == TR.ALL_DONE_MIN_ONE_SUCCESS:
# For this trigger rule, skipped tasks are not considered "done"
non_skipped_done = success + failed + upstream_failed + removed
non_skipped_upstream = upstream - skipped

if skipped > 0:
# There are skipped tasks, so not all tasks are "done" for this rule
new_state = TaskInstanceState.SKIPPED
elif non_skipped_done >= non_skipped_upstream and success == 0:
# All non-skipped tasks are done but no successes
new_state = TaskInstanceState.UPSTREAM_FAILED
elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS:
if upstream_done and upstream_setup and skipped_setup >= upstream_setup:
# when there is an upstream setup and they have all skipped, then skip
Expand Down Expand Up @@ -573,6 +584,41 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif trigger_rule == TR.ALL_DONE_MIN_ONE_SUCCESS:
# For this trigger rule, skipped tasks are not considered "done"
non_skipped_done = success + failed + upstream_failed + removed
non_skipped_upstream = upstream - skipped
if ti.map_index > -1:
non_skipped_upstream -= removed
non_skipped_done -= removed

if skipped > 0:
yield self._failing_status(
reason=(
f"Task's trigger rule '{trigger_rule}' requires all non-skipped upstream tasks to have "
f"completed, but found {skipped} skipped task(s). "
f"upstream_states={upstream_states}, "
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif non_skipped_done < non_skipped_upstream:
yield self._failing_status(
reason=(
f"Task's trigger rule '{trigger_rule}' requires all non-skipped upstream tasks to have "
f"completed, but found {non_skipped_upstream - non_skipped_done} task(s) that were not done. "
f"upstream_states={upstream_states}, "
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif success == 0:
yield self._failing_status(
reason=(
f"Task's trigger rule '{trigger_rule}' requires all non-skipped upstream tasks to have "
f"completed and at least one upstream task has succeeded, but found "
f"{success} successful task(s). upstream_states={upstream_states}, "
f"upstream_task_ids={task.upstream_task_ids}"
)
)
else:
yield self._failing_status(reason=f"No strategy to evaluate trigger rule '{trigger_rule}'.")

Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/utils/trigger_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TriggerRule(str, Enum):
ALL_SUCCESS = "all_success"
ALL_FAILED = "all_failed"
ALL_DONE = "all_done"
ALL_DONE_MIN_ONE_SUCCESS = "all_done_min_one_success"
ALL_DONE_SETUP_SUCCESS = "all_done_setup_success"
ONE_SUCCESS = "one_success"
ONE_FAILED = "one_failed"
Expand Down
148 changes: 147 additions & 1 deletion airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@

import pytest

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag_version import DagVersion
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import task, task_group
from airflow.sdk.bases.operator import BaseOperator
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep, _UpstreamTIStates
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down Expand Up @@ -1037,6 +1037,152 @@ def test_unknown_tr(self, session, get_task_instance, flag_upstream_failed):
expected_reason="No strategy to evaluate trigger rule 'Unknown Trigger Rule'.",
)

@pytest.mark.parametrize("flag_upstream_failed, expected_ti_state", [(True, None), (False, None)])
def test_all_done_min_one_success_with_mixed_success_and_failure(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
):
"""
Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with mixed upstream task states.

When upstream tasks have mixed states (success and failure), the trigger rule
should pass since all non-skipped tasks are done and at least one succeeded.
"""
ti = get_task_instance(
TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
success=1,
skipped=0,
failed=1,
removed=0,
upstream_failed=0,
done=2,
normal_tasks=["upstream_success_task", "upstream_failure_task"],
)
_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_ti_state=expected_ti_state,
)

@pytest.mark.parametrize("flag_upstream_failed, expected_ti_state", [(True, None), (False, None)])
def test_all_done_min_one_success_with_all_successful_upstreams(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
):
"""
Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with all upstream tasks successful.

When all upstream tasks succeed, the trigger rule should pass as the minimum
requirement of at least one success is satisfied.
"""
ti = get_task_instance(
TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
success=2,
skipped=0,
failed=0,
removed=0,
upstream_failed=0,
done=2,
normal_tasks=["upstream_success_task_1", "upstream_success_task_2"],
)
_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_ti_state=expected_ti_state,
)

@pytest.mark.parametrize("flag_upstream_failed, expected_ti_state", [(True, SKIPPED), (False, None)])
def test_all_done_min_one_success_with_success_and_skipped_upstream(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
):
"""
Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with success and skipped upstream tasks.

When upstream tasks include both successful and skipped tasks, the trigger rule
should fail because skipped tasks violate the "all non-skipped tasks done" requirement.
The task should be skipped when flag_upstream_failed is True.
"""
ti = get_task_instance(
TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
success=1,
skipped=1,
failed=0,
removed=0,
upstream_failed=0,
done=2,
normal_tasks=["upstream_success_task", "upstream_skipped_task"],
)
_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_reason="requires all non-skipped upstream tasks to have completed, but found 1 skipped task(s)",
expected_ti_state=expected_ti_state,
)

@pytest.mark.parametrize(
"flag_upstream_failed, expected_ti_state", [(True, UPSTREAM_FAILED), (False, None)]
)
def test_all_done_min_one_success_with_all_failed_upstreams(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
):
"""
Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with all upstream tasks failed.

When all upstream tasks have failed, the trigger rule should fail because
there are no successful tasks to satisfy the "at least one success" requirement.
The task should be marked as UPSTREAM_FAILED when flag_upstream_failed is True.
"""
ti = get_task_instance(
TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
success=0,
skipped=0,
failed=2,
removed=0,
upstream_failed=0,
done=2,
normal_tasks=["upstream_failed_task_1", "upstream_failed_task_2"],
)
_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_reason="requires all non-skipped upstream tasks to have completed and at least one upstream task has succeeded",
expected_ti_state=expected_ti_state,
)

@pytest.mark.parametrize(
"flag_upstream_failed, expected_ti_state", [(True, UPSTREAM_FAILED), (False, None)]
)
def test_all_done_min_one_success_with_upstream_failed_cascade(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
):
"""
Test ALL_DONE_MIN_ONE_SUCCESS trigger rule with upstream failure cascade.

When upstream tasks are in UPSTREAM_FAILED state (cascaded from earlier failures),
the trigger rule should fail because there are no successful tasks to satisfy
the "at least one success" requirement. The task should be marked as UPSTREAM_FAILED
when flag_upstream_failed is True.
"""
ti = get_task_instance(
TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
success=0,
skipped=0,
failed=0,
removed=0,
upstream_failed=1,
done=1,
normal_tasks=["upstream_cascaded_failure_task"],
)
_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_reason="requires all non-skipped upstream tasks to have completed and at least one upstream task has succeeded",
expected_ti_state=expected_ti_state,
)

def test_UpstreamTIStates(self, session, dag_maker):
"""
this test tests the helper class '_UpstreamTIStates' as a unit and inside update_state
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/tests/unit/utils/test_trigger_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def test_valid_trigger_rules(self):
assert TriggerRule.is_valid(TriggerRule.ALWAYS)
assert TriggerRule.is_valid(TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
assert TriggerRule.is_valid(TriggerRule.ALL_DONE_SETUP_SUCCESS)
assert len(TriggerRule.all_triggers()) == 12
assert TriggerRule.is_valid(TriggerRule.ALL_DONE_MIN_ONE_SUCCESS)
assert len(TriggerRule.all_triggers()) == 13

with pytest.raises(ValueError):
TriggerRule("NOT_EXIST_TRIGGER_RULE")