diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 1f4e1ff81828c..6e7964d9bfe11 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -765,7 +765,12 @@ def _do_scheduling(self, session) -> int: # Send the callbacks after we commit to ensure the context is up to date when it gets run for dag_run, callback_to_run in callback_tuples: - self._send_dag_callbacks_to_processor(dag_run, callback_to_run) + dag = self.dagbag.get_dag(dag_run.dag_id, session=session) + if not dag: + self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) + continue + + self._send_dag_callbacks_to_processor(dag, callback_to_run) # Without this, the session has an invalid view of the DB session.expunge_all() @@ -989,7 +994,7 @@ def _schedule_dag_run( ) # Send SLA & DAG Success/Failure Callbacks to be executed - self._send_dag_callbacks_to_processor(dag_run, callback_to_execute) + self._send_dag_callbacks_to_processor(dag, callback_to_execute) return 0 @@ -1025,13 +1030,10 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session=None): # Verify integrity also takes care of session.flush dag_run.verify_integrity(session=session) - def _send_dag_callbacks_to_processor( - self, dag_run: DagRun, callback: Optional[DagCallbackRequest] = None - ): + def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None): if not self.processor_agent: raise ValueError("Processor agent is not started.") - dag = dag_run.get_dag() self._send_sla_callbacks_to_processor(dag) if callback: self.processor_agent.send_callback_to_execute(callback) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 7ae162b6ae2e1..7c2033fc750f8 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1363,7 +1363,6 @@ def test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined(self, sta self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once() call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0] assert call_args[0].dag_id == dr.dag_id - assert call_args[0].execution_date == dr.execution_date assert call_args[1] is None session.rollback() @@ -1394,11 +1393,10 @@ def test_dagrun_callbacks_are_added_when_callbacks_are_defined(self, state, msg, with mock.patch.object(settings, "USE_JOB_SCHEDULE", False): self.scheduler_job._do_scheduling(session) - # Verify Callback is not set (i.e is None) when no callbacks are set on DAG + # Verify Callback is set (i.e is None) when no callbacks are set on DAG self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once() call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0] assert call_args[0].dag_id == dr.dag_id - assert call_args[0].execution_date == dr.execution_date assert call_args[1] is not None assert call_args[1].msg == msg session.rollback()