Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ class DagFileProcessorManager(LoggingMixin):
_force_refresh_bundles: set[str] = attrs.field(factory=set, init=False)
"""List of bundles that need to be force refreshed in the next loop"""

_file_parsing_sort_mode: str = attrs.field(
factory=_config_get_factory("dag_processor", "file_parsing_sort_mode")
)

_api_server: InProcessExecutionAPI = attrs.field(init=False, factory=InProcessExecutionAPI)
"""API server to interact with Metadata DB"""

Expand Down Expand Up @@ -517,12 +521,14 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):

self._bundles_last_refreshed = now_seconds

any_refreshed = False
for bundle in self._dag_bundles:
# TODO: AIP-66 handle errors in the case of incomplete cloning? And test this.
# What if the cloning/refreshing took too long(longer than the dag processor timeout)
if not bundle.is_initialized:
try:
bundle.initialize()
any_refreshed = True
except AirflowException as e:
self.log.exception("Error initializing bundle %s: %s", bundle.name, e)
continue
Expand Down Expand Up @@ -560,6 +566,7 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):

try:
bundle.refresh()
any_refreshed = True
except Exception:
self.log.exception("Error refreshing bundle %s", bundle.name)
continue
Expand Down Expand Up @@ -596,14 +603,17 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
}

known_files[bundle.name] = found_files
self.handle_removed_files(known_files=known_files)

self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files)
self.clear_orphaned_import_errors(
bundle_name=bundle.name,
observed_filelocs={str(x.rel_path) for x in found_files}, # todo: make relative
)

if any_refreshed:
self.handle_removed_files(known_files=known_files)
self._resort_file_queue()

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
# Build up a list of Python files that could contain DAGs
Expand Down Expand Up @@ -961,6 +971,11 @@ def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
self.log.info("Adding new file %s to parsing queue", file)
self._file_queue.appendleft(file)

def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and self._file_queue:
files, _ = self._sort_by_mtime(self._file_queue)
self._file_queue = deque(files)

def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
files_with_mtime: dict[DagFileInfo, float] = {}
changed_recently = set()
Expand Down Expand Up @@ -1003,7 +1018,6 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]):
now = timezone.utcnow()

# Sort the file paths by the parsing order mode
list_mode = conf.get("dag_processor", "file_parsing_sort_mode")
recently_processed = set()
files = []

Expand All @@ -1014,11 +1028,11 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]):
recently_processed.add(file)

changed_recently: set[DagFileInfo] = set()
if list_mode == "modified_time":
if self._file_parsing_sort_mode == "modified_time":
files, changed_recently = self._sort_by_mtime(files=files)
elif list_mode == "alphabetical":
elif self._file_parsing_sort_mode == "alphabetical":
files.sort(key=attrgetter("rel_path"))
elif list_mode == "random_seeded_by_host":
elif self._file_parsing_sort_mode == "random_seeded_by_host":
# Shuffle the list seeded by hostname so multiple DAG processors can work on different
# set of files. Since we set the seed, the sort order will remain same per host
random.Random(get_hostname()).shuffle(files)
Expand Down
46 changes: 46 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,52 @@ def test_add_new_file_to_parsing_queue(self):
)
assert manager._file_queue == deque(ordered_files)

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
def test_resort_file_queue_by_mtime(self):
"""
Check that existing files in the queue are re-sorted by mtime when calling _resort_file_queue,
if sort mode is modified_time.
"""
# Prepare some files with mtimes
files_with_mtime = [
("file_1.py", 100.0),
("file_2.py", 200.0),
]
filenames = encode_mtime_in_filename(files_with_mtime)
dag_files = _get_file_infos(filenames)
# dag_files[0] -> file_1 (mtime 100)
# dag_files[1] -> file_2 (mtime 200)

manager = DagFileProcessorManager(max_runs=1)

# Populate queue with unsorted files
# Queue: [file_1 (100), file_2 (200)]
manager._file_queue = deque([dag_files[0], dag_files[1]])

manager._resort_file_queue()

# Verify resort happened: [file_2 (200), file_1 (100)]
assert list(manager._file_queue) == [dag_files[1], dag_files[0]]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
def test_resort_file_queue_does_nothing_when_alphabetical(self):
"""
Check that _resort_file_queue does NOT change the order if sort mode is alphabetical.
"""
file_a = DagFileInfo(bundle_name="testing", rel_path=Path("a.py"), bundle_path=TEST_DAGS_FOLDER)
file_b = DagFileInfo(bundle_name="testing", rel_path=Path("b.py"), bundle_path=TEST_DAGS_FOLDER)

manager = DagFileProcessorManager(max_runs=1)

# Populate queue in non-alphabetical order
manager._file_queue = deque([file_b, file_a])

manager._resort_file_queue()

# Order should remain unchanged
assert list(manager._file_queue) == [file_b, file_a]

@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):
Expand Down