Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,13 @@ def __init__(

def _get_dttm_filter(self, context):
logical_date = context.get("logical_date")
if logical_date is None:
dag_run = context.get("dag_run")
if TYPE_CHECKING:
assert dag_run
if AIRFLOW_V_3_0_PLUS:
if logical_date is None:
dag_run = context.get("dag_run")
if TYPE_CHECKING:
assert dag_run

logical_date = dag_run.run_after
logical_date = dag_run.run_after
if self.execution_delta:
dttm = logical_date - self.execution_delta
elif self.execution_date_fn:
Expand Down Expand Up @@ -428,7 +429,7 @@ def execute(self, context: Context) -> None:
else:
dttm_filter = self._get_dttm_filter(context)
logical_or_execution_dates = (
{"logical_dates": dttm_filter} if AIRFLOW_V_3_0_PLUS else {"execution_date": dttm_filter}
{"logical_dates": dttm_filter} if AIRFLOW_V_3_0_PLUS else {"execution_dates": dttm_filter}
)
self.defer(
timeout=self.execution_timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,31 @@ def test_fail__check_for_existence(
with pytest.raises(specific_exception, match=expected_message):
op.execute(context={})

@pytest.mark.execution_timeout(10)
def test_external_task_sensor_deferrable(self, dag_maker):
context = {}
with dag_maker() as dag:
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id="test_dag_parent",
external_task_id="test_task",
deferrable=True,
allowed_states=["success"],
)
dr = dag.create_dagrun(
run_id="abcrhroceuh",
run_type=DagRunType.MANUAL,
state=None,
)
context.update(dag_run=dr, logical_date=DEFAULT_DATE)

with pytest.raises(TaskDeferred) as exc:
op.execute(context=context)
assert isinstance(exc.value.trigger, WorkflowTrigger)
assert exc.value.trigger.external_dag_id == "test_dag_parent"
assert exc.value.trigger.external_task_ids == ["test_task"]
assert exc.value.trigger.execution_dates == [DEFAULT_DATE]


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Different test for AF 2")
@pytest.mark.usefixtures("testing_dag_bundle")
Expand Down Expand Up @@ -1236,6 +1261,7 @@ def test_external_task_sensor_deferrable(self, dag_maker):
assert isinstance(exc.value.trigger, WorkflowTrigger)
assert exc.value.trigger.external_dag_id == "test_dag_parent"
assert exc.value.trigger.external_task_ids == ["test_task"]
assert exc.value.trigger.logical_dates == [DEFAULT_DATE]

@pytest.mark.execution_timeout(10)
def test_external_task_sensor_only_dag_id(self, dag_maker):
Expand Down