-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
area:async-operatorsAIP-40: Deferrable ("Async") OperatorsAIP-40: Deferrable ("Async") Operatorsarea:task-execution-interface-aip72AIP-72: Task Execution Interface (TEI) aka Task SDKAIP-72: Task Execution Interface (TEI) aka Task SDKarea:task-sdkpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release
Milestone
Description
Follow-up of #47882 . In that PR TriggerDagRunOperator was ported to work with the Task SDK.
airflow/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Line 236 in c7a0681
| # TODO: Support deferral |
However, the deferral mode needs access to the DB, which we should port over.
and add similar logic to Airflow 2
airflow/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Lines 276 to 287 in c7a0681
| # Kick off the deferral process | |
| if self._defer: | |
| self.defer( | |
| trigger=DagStateTrigger( | |
| dag_id=self.trigger_dag_id, | |
| states=self.allowed_states + self.failed_states, | |
| execution_dates=[dag_run.logical_date], | |
| run_ids=[run_id], | |
| poll_interval=self.poke_interval, | |
| ), | |
| method_name="execute_complete", | |
| ) |
at
airflow/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Lines 307 to 334 in c7a0681
| @provide_session | |
| def execute_complete(self, context: Context, session: Session, event: tuple[str, dict[str, Any]]): | |
| # This logical_date is parsed from the return trigger event | |
| provided_logical_date = event[1]["execution_dates"][0] | |
| try: | |
| # Note: here execution fails on database isolation mode. Needs structural changes for AIP-72 | |
| dag_run = session.execute( | |
| select(DagRun).where( | |
| DagRun.dag_id == self.trigger_dag_id, DagRun.execution_date == provided_logical_date | |
| ) | |
| ).scalar_one() | |
| except NoResultFound: | |
| raise AirflowException( | |
| f"No DAG run found for DAG {self.trigger_dag_id} and logical date {self.logical_date}" | |
| ) | |
| state = dag_run.state | |
| if state in self.failed_states: | |
| raise AirflowException(f"{self.trigger_dag_id} failed with failed state {state}") | |
| if state in self.allowed_states: | |
| self.log.info("%s finished with allowed state %s", self.trigger_dag_id, state) | |
| return | |
| raise AirflowException( | |
| f"{self.trigger_dag_id} return {state} which is not in {self.failed_states}" | |
| f" or {self.allowed_states}" | |
| ) |
Metadata
Metadata
Assignees
Labels
area:async-operatorsAIP-40: Deferrable ("Async") OperatorsAIP-40: Deferrable ("Async") Operatorsarea:task-execution-interface-aip72AIP-72: Task Execution Interface (TEI) aka Task SDKAIP-72: Task Execution Interface (TEI) aka Task SDKarea:task-sdkpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release
Type
Projects
Status
Done