Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,6 @@ def _run_parsing_loop(self):
# cleared all files added as a result of callbacks
self.prepare_file_queue(known_files=known_files)
self.emit_metrics()
else:
# if new files found in dag dir, add them
self.add_files_to_queue(known_files=known_files)

self._start_new_processes()

Expand Down Expand Up @@ -613,6 +610,7 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
if any_refreshed:
self.handle_removed_files(known_files=known_files)
self._resort_file_queue()
self._add_new_files_to_queue(known_files=known_files)

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
Expand Down Expand Up @@ -963,13 +961,22 @@ def _start_new_processes(self):
self._processors[file] = processor
Stats.gauge("dag_processing.file_path_queue_size", len(self._file_queue))

def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
"""
Add new files to the front of the queue.

A "new" file is a file that has not been processed yet and is not currently being processed.
"""
new_files = []
for files in known_files.values():
for file in files:
if file not in self._file_stats: # todo: store stats by bundle also?
# We found new file after refreshing dir. add to parsing queue at start
self.log.info("Adding new file %s to parsing queue", file)
self._file_queue.appendleft(file)
# todo: store stats by bundle also?
if file not in self._file_stats and file not in self._processors:
new_files.append(file)

if new_files:
self.log.info("Adding %d new files to the front of the queue", len(new_files))
self._add_files_to_queue(new_files, True)

def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and self._file_queue:
Expand Down
37 changes: 36 additions & 1 deletion airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def test_add_new_file_to_parsing_queue(self):
manager.prepare_file_queue(
known_files={"any": set((*dag_files, *_get_file_infos(["file_4-ss=1.0.py"])))}
)
# manager.add_files_to_queue()
# manager._add_new_files_to_queue()
ordered_files = _get_file_infos(
[
"file_3-ss=4.0.py",
Expand All @@ -353,6 +353,41 @@ def test_add_new_file_to_parsing_queue(self):
)
assert manager._file_queue == deque(ordered_files)

def test_add_new_files_to_queue_behavior(self):
"""
Check that _add_new_files_to_queue:
1. Adds new files to the front of the queue.
2. Skips files that are currently being processed.
3. Skips files that have already been processed (in _file_stats).
4. Does not re-add files already in the queue.
"""
manager = DagFileProcessorManager(max_runs=1)
file_1 = DagFileInfo(bundle_name="testing", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
file_2 = DagFileInfo(bundle_name="testing", rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
file_3 = DagFileInfo(bundle_name="testing", rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER)
file_4 = DagFileInfo(bundle_name="testing", rel_path=Path("file_4.py"), bundle_path=TEST_DAGS_FOLDER)

# Setup:
# file_1 is already in the queue
manager._file_queue = deque([file_1])

# file_3 is currently being processed
manager._processors[file_3] = MagicMock()

# file_4 has already been processed
manager._file_stats[file_4] = DagFileStat(num_dags=1)

# known_files contains all four
known_files = {"testing": {file_1, file_2, file_3, file_4}}

manager._add_new_files_to_queue(known_files)

# file_4 should be ignored (in file_stats)
# file_3 should be ignored (processing)
# file_2 should be at the front (new)
# file_1 should remain (already in queue)
assert list(manager._file_queue) == [file_2, file_1]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
def test_resort_file_queue_by_mtime(self):
Expand Down