diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 325cfcd4d3c51..cc27679773d4b 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -260,12 +260,13 @@ def __init__( def _get_dttm_filter(self, context): logical_date = context.get("logical_date") - if logical_date is None: - dag_run = context.get("dag_run") - if TYPE_CHECKING: - assert dag_run + if AIRFLOW_V_3_0_PLUS: + if logical_date is None: + dag_run = context.get("dag_run") + if TYPE_CHECKING: + assert dag_run - logical_date = dag_run.run_after + logical_date = dag_run.run_after if self.execution_delta: dttm = logical_date - self.execution_delta elif self.execution_date_fn: @@ -428,7 +429,7 @@ def execute(self, context: Context) -> None: else: dttm_filter = self._get_dttm_filter(context) logical_or_execution_dates = ( - {"logical_dates": dttm_filter} if AIRFLOW_V_3_0_PLUS else {"execution_date": dttm_filter} + {"logical_dates": dttm_filter} if AIRFLOW_V_3_0_PLUS else {"execution_dates": dttm_filter} ) self.defer( timeout=self.execution_timeout, diff --git a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py index 5c1990a6b7f08..78d2b9434663c 100644 --- a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py +++ b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py @@ -991,6 +991,31 @@ def test_fail__check_for_existence( with pytest.raises(specific_exception, match=expected_message): op.execute(context={}) + @pytest.mark.execution_timeout(10) + def test_external_task_sensor_deferrable(self, dag_maker): + context = {} + with dag_maker() as dag: + op = ExternalTaskSensor( + task_id="test_external_task_sensor_check", + external_dag_id="test_dag_parent", + external_task_id="test_task", + deferrable=True, + allowed_states=["success"], + ) + dr = dag.create_dagrun( + run_id="abcrhroceuh", + run_type=DagRunType.MANUAL, + state=None, + ) + context.update(dag_run=dr, logical_date=DEFAULT_DATE) + + with pytest.raises(TaskDeferred) as exc: + op.execute(context=context) + assert isinstance(exc.value.trigger, WorkflowTrigger) + assert exc.value.trigger.external_dag_id == "test_dag_parent" + assert exc.value.trigger.external_task_ids == ["test_task"] + assert exc.value.trigger.execution_dates == [DEFAULT_DATE] + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Different test for AF 2") @pytest.mark.usefixtures("testing_dag_bundle") @@ -1236,6 +1261,7 @@ def test_external_task_sensor_deferrable(self, dag_maker): assert isinstance(exc.value.trigger, WorkflowTrigger) assert exc.value.trigger.external_dag_id == "test_dag_parent" assert exc.value.trigger.external_task_ids == ["test_task"] + assert exc.value.trigger.logical_dates == [DEFAULT_DATE] @pytest.mark.execution_timeout(10) def test_external_task_sensor_only_dag_id(self, dag_maker):