diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index c1c267fe99c98..65d41533e242c 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1009,8 +1009,21 @@ def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): def _resort_file_queue(self): if self._file_parsing_sort_mode == "modified_time" and self._file_queue: - files, _ = self._sort_by_mtime(self._file_queue) - self._file_queue = deque(files) + # Separate files with pending callbacks from regular files + # Callbacks should stay at the front regardless of mtime + callback_files = [] + regular_files = [] + for file in self._file_queue: + if file in self._callback_to_execute: + callback_files.append(file) + else: + regular_files.append(file) + + # Sort only the regular files by mtime + sorted_regular_files, _ = self._sort_by_mtime(regular_files) + + # Put callback files at the front, then sorted regular files + self._file_queue = deque(callback_files + sorted_regular_files) def _sort_by_mtime(self, files: Iterable[DagFileInfo]): files_with_mtime: dict[DagFileInfo, float] = {} diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index c06e96388e04c..00de3148c968e 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -429,6 +429,42 @@ def test_resort_file_queue_does_nothing_when_alphabetical(self): # Order should remain unchanged assert list(manager._file_queue) == [file_b, file_a] + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) + def test_resort_file_queue_keeps_callbacks_at_front(self): + """ + Check that files with pending callbacks stay at the front of the queue + regardless of their modification time, and preserve their relative order. + """ + files_with_mtime = [ + ("callback_1.py", 50.0), # has callback, oldest mtime + ("callback_2.py", 300.0), # has callback, newest mtime + ("regular_1.py", 100.0), # no callback + ("regular_2.py", 200.0), # no callback + ] + filenames = encode_mtime_in_filename(files_with_mtime) + dag_files = _get_file_infos(filenames) + # dag_files[0] -> callback_1 (mtime 50) + # dag_files[1] -> callback_2 (mtime 300) + # dag_files[2] -> regular_1 (mtime 100) + # dag_files[3] -> regular_2 (mtime 200) + + manager = DagFileProcessorManager(max_runs=1) + + # Queue order: callback_1, callback_2, regular_1, regular_2 + manager._file_queue = deque([dag_files[0], dag_files[1], dag_files[2], dag_files[3]]) + + # Both callback files have pending callbacks + manager._callback_to_execute[dag_files[0]] = [MagicMock()] + manager._callback_to_execute[dag_files[1]] = [MagicMock()] + + manager._resort_file_queue() + + # Callback files should stay at front in original order (callback_1, callback_2) + # despite callback_1 having the oldest mtime and callback_2 having the newest + # Regular files should be sorted by mtime (newest first): regular_2 (200), regular_1 (100) + assert list(manager._file_queue) == [dag_files[0], dag_files[1], dag_files[3], dag_files[2]] + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("airflow.utils.file.os.path.getmtime") def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):