Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ def poke(self, context, session=None):
if self.failed_states:
count_failed = self.get_count(dttm_filter, session, self.failed_states)

if count_failed == len(dttm_filter):
# Fail if anything in the list has failed.
if count_failed > 0:
if self.external_task_ids:
if self.soft_fail:
raise AirflowSkipException(
Expand Down
3 changes: 3 additions & 0 deletions newsfragments/27190.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
``ExternalTaskSensor`` no longer hangs indefinitely when ``failed_states`` is set, an ``execute_date_fn`` is used, and some but not all of the dependent tasks fail. Instead, an ``AirflowException`` is thrown as soon as any of the dependent tasks fail.

Any code handling this failure in addition to timeouts should move to cathing the ``AirflowException`` baseclass and not only the ``AirflowSensorTimeout`` subclass.
24 changes: 24 additions & 0 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,30 @@ def test_external_task_sensor_fn_multiple_execution_dates(self):
with pytest.raises(AirflowSensorTimeout):
task_with_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

# Test to ensure that if one task in a chain of tasks fails, the
# ExternalTaskSensor will also report a failure and return without
# waiting for a timeout.
task_chain_with_failure = ExternalTaskSensor(
task_id="task_chain_with_failure",
external_dag_id=dag_external_id,
external_task_id="task_external_with_failure",
execution_date_fn=lambda dt: [dt + timedelta(seconds=i) for i in range(3)],
allowed_states=["success"],
failed_states=["failed"],
retries=0,
timeout=5,
poke_interval=1,
dag=dag,
)

# We need to test for an AirflowException explicitly since
# AirflowSensorTimeout is a subclass that will be raised if this does
# not execute properly.
try:
task_chain_with_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
except AirflowException as ex:
assert type(ex) == AirflowException

def test_external_task_sensor_delta(self):
self.add_time_sensor()
op = ExternalTaskSensor(
Expand Down