diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 1134c673da725..64b84ab46c8ae 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -19,6 +19,7 @@ import functools import os import sys +import time import traceback from pathlib import Path from typing import TYPE_CHECKING, Annotated, BinaryIO, Callable, ClassVar, Literal, Union @@ -229,6 +230,8 @@ class DagFileProcessorProcess(WatchedSubprocess): logger_filehandle: BinaryIO parsing_result: DagFileParsingResult | None = None decoder: ClassVar[TypeAdapter[ToManager]] = TypeAdapter[ToManager](ToManager) + _start_time: float | None = None + """Time when the process started parsing the file.""" @classmethod def start( # type: ignore[override] @@ -242,6 +245,7 @@ def start( # type: ignore[override] ) -> Self: proc: Self = super().start(target=target, **kwargs) proc._on_child_started(callbacks, path, bundle_path) + proc._start_time = time.time() return proc def _on_child_started( @@ -305,7 +309,18 @@ def is_ready(self) -> bool: @property def start_time(self) -> float: - return self._process.create_time() + if not self._start_time: + raise ValueError("start_time not set") + return self._start_time + + @start_time.setter + def start_time(self, value: float | None = None) -> None: + if self._start_time: + # Ensures this is not set twice + return + if not value: + value = time.time() + self._start_time = value def wait(self) -> int: raise NotImplementedError(f"Don't call wait on {type(self).__name__} objects") diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 027d58e6fa1a9..ad0d82070d5b2 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -26,7 +26,6 @@ import shutil import signal import textwrap -import time from collections import deque from datetime import datetime, timedelta from logging.config import dictConfig @@ -135,7 +134,6 @@ def teardown_class(self): def mock_processor(self) -> DagFileProcessorProcess: proc = MagicMock() logger_filehandle = MagicMock() - proc.create_time.return_value = time.time() proc.wait.return_value = 0 ret = DagFileProcessorProcess( process_log=MagicMock(), @@ -493,7 +491,7 @@ def test_kill_timed_out_processors_kill(self): manager = DagFileProcessorManager(max_runs=1, processor_timeout=5) processor = self.mock_processor() - processor._process.create_time.return_value = timezone.make_aware(datetime.min).timestamp() + processor._start_time = timezone.make_aware(datetime.min).timestamp() manager._processors = { DagFileInfo( bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER @@ -512,7 +510,7 @@ def test_kill_timed_out_processors_no_kill(self): ) processor = self.mock_processor() - processor._process.create_time.return_value = timezone.make_aware(datetime.max).timestamp() + processor._start_time = timezone.make_aware(datetime.max).timestamp() manager._processors = { DagFileInfo( bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER @@ -522,6 +520,21 @@ def test_kill_timed_out_processors_no_kill(self): manager._kill_timed_out_processors() mock_kill.assert_not_called() + def test_start_time_raises_when_not_set(self): + manager = DagFileProcessorManager( + max_runs=1, + processor_timeout=5, + ) + + processor = self.mock_processor() + manager._processors = { + DagFileInfo( + bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER + ): processor + } + with pytest.raises(ValueError, match="start_time not set"): + manager._kill_timed_out_processors() + @pytest.mark.usefixtures("testing_dag_bundle") @pytest.mark.parametrize( ["callbacks", "path", "expected_buffer"],