diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 3bf5b40afd7d0..bddeedd8ac134 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -364,9 +364,6 @@ def _run_parsing_loop(self): # cleared all files added as a result of callbacks self.prepare_file_queue(known_files=known_files) self.emit_metrics() - else: - # if new files found in dag dir, add them - self.add_files_to_queue(known_files=known_files) self._start_new_processes() @@ -613,6 +610,7 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]): if any_refreshed: self.handle_removed_files(known_files=known_files) self._resort_file_queue() + self._add_new_files_to_queue(known_files=known_files) def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" @@ -963,13 +961,22 @@ def _start_new_processes(self): self._processors[file] = processor Stats.gauge("dag_processing.file_path_queue_size", len(self._file_queue)) - def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): + def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]): + """ + Add new files to the front of the queue. + + A "new" file is a file that has not been processed yet and is not currently being processed. + """ + new_files = [] for files in known_files.values(): for file in files: - if file not in self._file_stats: # todo: store stats by bundle also? - # We found new file after refreshing dir. add to parsing queue at start - self.log.info("Adding new file %s to parsing queue", file) - self._file_queue.appendleft(file) + # todo: store stats by bundle also? + if file not in self._file_stats and file not in self._processors: + new_files.append(file) + + if new_files: + self.log.info("Adding %d new files to the front of the queue", len(new_files)) + self._add_files_to_queue(new_files, True) def _resort_file_queue(self): if self._file_parsing_sort_mode == "modified_time" and self._file_queue: diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 2071440722695..83d10d79e7191 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -342,7 +342,7 @@ def test_add_new_file_to_parsing_queue(self): manager.prepare_file_queue( known_files={"any": set((*dag_files, *_get_file_infos(["file_4-ss=1.0.py"])))} ) - # manager.add_files_to_queue() + # manager._add_new_files_to_queue() ordered_files = _get_file_infos( [ "file_3-ss=4.0.py", @@ -353,6 +353,41 @@ def test_add_new_file_to_parsing_queue(self): ) assert manager._file_queue == deque(ordered_files) + def test_add_new_files_to_queue_behavior(self): + """ + Check that _add_new_files_to_queue: + 1. Adds new files to the front of the queue. + 2. Skips files that are currently being processed. + 3. Skips files that have already been processed (in _file_stats). + 4. Does not re-add files already in the queue. + """ + manager = DagFileProcessorManager(max_runs=1) + file_1 = DagFileInfo(bundle_name="testing", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER) + file_2 = DagFileInfo(bundle_name="testing", rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER) + file_3 = DagFileInfo(bundle_name="testing", rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER) + file_4 = DagFileInfo(bundle_name="testing", rel_path=Path("file_4.py"), bundle_path=TEST_DAGS_FOLDER) + + # Setup: + # file_1 is already in the queue + manager._file_queue = deque([file_1]) + + # file_3 is currently being processed + manager._processors[file_3] = MagicMock() + + # file_4 has already been processed + manager._file_stats[file_4] = DagFileStat(num_dags=1) + + # known_files contains all four + known_files = {"testing": {file_1, file_2, file_3, file_4}} + + manager._add_new_files_to_queue(known_files) + + # file_4 should be ignored (in file_stats) + # file_3 should be ignored (processing) + # file_2 should be at the front (new) + # file_1 should remain (already in queue) + assert list(manager._file_queue) == [file_2, file_1] + @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) def test_resort_file_queue_by_mtime(self):