diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index a6ddae8303898..3bf5b40afd7d0 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -225,6 +225,10 @@ class DagFileProcessorManager(LoggingMixin): _force_refresh_bundles: set[str] = attrs.field(factory=set, init=False) """List of bundles that need to be force refreshed in the next loop""" + _file_parsing_sort_mode: str = attrs.field( + factory=_config_get_factory("dag_processor", "file_parsing_sort_mode") + ) + _api_server: InProcessExecutionAPI = attrs.field(init=False, factory=InProcessExecutionAPI) """API server to interact with Metadata DB""" @@ -517,12 +521,14 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): self._bundles_last_refreshed = now_seconds + any_refreshed = False for bundle in self._dag_bundles: # TODO: AIP-66 handle errors in the case of incomplete cloning? And test this. # What if the cloning/refreshing took too long(longer than the dag processor timeout) if not bundle.is_initialized: try: bundle.initialize() + any_refreshed = True except AirflowException as e: self.log.exception("Error initializing bundle %s: %s", bundle.name, e) continue @@ -560,6 +566,7 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): try: bundle.refresh() + any_refreshed = True except Exception: self.log.exception("Error refreshing bundle %s", bundle.name) continue @@ -596,7 +603,6 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): } known_files[bundle.name] = found_files - self.handle_removed_files(known_files=known_files) self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files) self.clear_orphaned_import_errors( @@ -604,6 +610,10 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): observed_filelocs={str(x.rel_path) for x in found_files}, # todo: make relative ) + if any_refreshed: + self.handle_removed_files(known_files=known_files) + self._resort_file_queue() + def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" # Build up a list of Python files that could contain DAGs @@ -961,6 +971,11 @@ def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): self.log.info("Adding new file %s to parsing queue", file) self._file_queue.appendleft(file) + def _resort_file_queue(self): + if self._file_parsing_sort_mode == "modified_time" and self._file_queue: + files, _ = self._sort_by_mtime(self._file_queue) + self._file_queue = deque(files) + def _sort_by_mtime(self, files: Iterable[DagFileInfo]): files_with_mtime: dict[DagFileInfo, float] = {} changed_recently = set() @@ -1003,7 +1018,6 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): now = timezone.utcnow() # Sort the file paths by the parsing order mode - list_mode = conf.get("dag_processor", "file_parsing_sort_mode") recently_processed = set() files = [] @@ -1014,11 +1028,11 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): recently_processed.add(file) changed_recently: set[DagFileInfo] = set() - if list_mode == "modified_time": + if self._file_parsing_sort_mode == "modified_time": files, changed_recently = self._sort_by_mtime(files=files) - elif list_mode == "alphabetical": + elif self._file_parsing_sort_mode == "alphabetical": files.sort(key=attrgetter("rel_path")) - elif list_mode == "random_seeded_by_host": + elif self._file_parsing_sort_mode == "random_seeded_by_host": # Shuffle the list seeded by hostname so multiple DAG processors can work on different # set of files. Since we set the seed, the sort order will remain same per host random.Random(get_hostname()).shuffle(files) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 5402ab20492de..2071440722695 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -353,6 +353,52 @@ def test_add_new_file_to_parsing_queue(self): ) assert manager._file_queue == deque(ordered_files) + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) + def test_resort_file_queue_by_mtime(self): + """ + Check that existing files in the queue are re-sorted by mtime when calling _resort_file_queue, + if sort mode is modified_time. + """ + # Prepare some files with mtimes + files_with_mtime = [ + ("file_1.py", 100.0), + ("file_2.py", 200.0), + ] + filenames = encode_mtime_in_filename(files_with_mtime) + dag_files = _get_file_infos(filenames) + # dag_files[0] -> file_1 (mtime 100) + # dag_files[1] -> file_2 (mtime 200) + + manager = DagFileProcessorManager(max_runs=1) + + # Populate queue with unsorted files + # Queue: [file_1 (100), file_2 (200)] + manager._file_queue = deque([dag_files[0], dag_files[1]]) + + manager._resort_file_queue() + + # Verify resort happened: [file_2 (200), file_1 (100)] + assert list(manager._file_queue) == [dag_files[1], dag_files[0]] + + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"}) + def test_resort_file_queue_does_nothing_when_alphabetical(self): + """ + Check that _resort_file_queue does NOT change the order if sort mode is alphabetical. + """ + file_a = DagFileInfo(bundle_name="testing", rel_path=Path("a.py"), bundle_path=TEST_DAGS_FOLDER) + file_b = DagFileInfo(bundle_name="testing", rel_path=Path("b.py"), bundle_path=TEST_DAGS_FOLDER) + + manager = DagFileProcessorManager(max_runs=1) + + # Populate queue in non-alphabetical order + manager._file_queue = deque([file_b, file_a]) + + manager._resort_file_queue() + + # Order should remain unchanged + assert list(manager._file_queue) == [file_b, file_a] + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("airflow.utils.file.os.path.getmtime") def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):