diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 93046d9a78f78..84a99a480a915 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -451,6 +451,7 @@ def _fetch_callbacks( callback_queue: list[CallbackRequest] = [] with prohibit_commit(session) as guard: + bundle_names = [bundle.name for bundle in self._dag_bundles] query: Select[tuple[DbCallbackRequest]] = select(DbCallbackRequest) query = query.order_by(DbCallbackRequest.priority_weight.desc()).limit( self.max_callbacks_per_loop @@ -461,8 +462,11 @@ def _fetch_callbacks( ) callbacks = session.scalars(query) for callback in callbacks: + req = callback.get_callback_request() + if req.bundle_name not in bundle_names: + continue try: - callback_queue.append(callback.get_callback_request()) + callback_queue.append(req) session.delete(callback) except Exception as e: self.log.warning("Error adding callback for execution: %s, %s", callback, e) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index e3ff30ae07062..205eaa4aaed6e 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -772,6 +772,7 @@ def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle): with configure_testing_dag_bundle(dag_filepath): manager = DagFileProcessorManager(max_runs=1) + manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) with create_session() as session: callbacks = manager._fetch_callbacks(session=session) @@ -815,6 +816,51 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te manager.run() assert session.query(DbCallbackRequest).count() == 1 + @conf_vars({("core", "load_examples"): "False"}) + def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): + """Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted.""" + + dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" + + # Create two callbacks: one for the active 'testing' bundle and one for a different bundle + matching = DagCallbackRequest( + dag_id="test_start_date_scheduling", + bundle_name="testing", + bundle_version=None, + filepath="test_on_failure_callback_dag.py", + is_failure_callback=True, + run_id="match", + ) + non_matching = DagCallbackRequest( + dag_id="test_start_date_scheduling", + bundle_name="other-bundle", + bundle_version=None, + filepath="test_on_failure_callback_dag.py", + is_failure_callback=True, + run_id="no-match", + ) + + with create_session() as session: + session.add(DbCallbackRequest(callback=matching, priority_weight=100)) + session.add(DbCallbackRequest(callback=non_matching, priority_weight=200)) + + with configure_testing_dag_bundle(dag_filepath): + manager = DagFileProcessorManager(max_runs=1) + manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) + + with create_session() as session: + callbacks = manager._fetch_callbacks(session=session) + + # Only the matching callback should be returned + assert [c.run_id for c in callbacks] == ["match"] + + # The non-matching callback should remain in the DB + remaining = session.query(DbCallbackRequest).all() + assert len(remaining) == 1 + # Decode remaining request and verify it's for the other bundle + remaining_req = remaining[0].get_callback_request() + assert remaining_req.bundle_name == "other-bundle" + @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file") def test_callback_queue(self, mock_get_logger, configure_testing_dag_bundle): mock_logger = MagicMock()