diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index b8ddf688dcb2a..324813cec3828 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -992,7 +992,7 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]): def _kill_timed_out_processors(self): """Kill any file processors that timeout to defend against process hangs.""" - now = time.time() + now = time.monotonic() processors_to_remove = [] for file, processor in self._processors.items(): duration = now - processor.start_time diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 25f9e2a73ed87..ef3e5cb69ca56 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -312,9 +312,5 @@ def is_ready(self) -> bool: return self._num_open_sockets == 0 - @property - def start_time(self) -> float: - return self._process.create_time() - def wait(self) -> int: raise NotImplementedError(f"Don't call wait on {type(self).__name__} objects") diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 3c180afa76b80..ebf684c678843 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -133,7 +133,7 @@ def teardown_class(self): clear_db_import_errors() clear_db_dag_bundles() - def mock_processor(self) -> tuple[DagFileProcessorProcess, socket]: + def mock_processor(self, start_time: float | None = None) -> tuple[DagFileProcessorProcess, socket]: proc = MagicMock() logger_filehandle = MagicMock() proc.create_time.return_value = time.time() @@ -148,6 +148,8 @@ def mock_processor(self) -> tuple[DagFileProcessorProcess, socket]: requests_fd=123, logger_filehandle=logger_filehandle, ) + if start_time: + ret.start_time = start_time ret._num_open_sockets = 0 return ret, read_end @@ -518,9 +520,7 @@ def test_scan_stale_dags(self, testing_dag_bundle): def test_kill_timed_out_processors_kill(self): manager = DagFileProcessorManager(max_runs=1, processor_timeout=5) - - processor, _ = self.mock_processor() - processor._process.create_time.return_value = timezone.make_aware(datetime.min).timestamp() + processor, _ = self.mock_processor(start_time=16000) manager._processors = { DagFileInfo( bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index db9474e68d569..0b77156578bb0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -428,6 +428,9 @@ class WatchedSubprocess: subprocess_logs_to_stdout: bool = False """Duplicate log messages to stdout, or only send them to ``self.process_log``.""" + start_time: float = attrs.field(factory=time.monotonic) + """The start time of the child process.""" + @classmethod def start( cls, @@ -481,6 +484,7 @@ def start( process=psutil.Process(pid), requests_fd=requests_fd, process_log=logger, + start_time=time.monotonic(), **constructor_kwargs, )