Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import functools
import os
import sys
import time
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, BinaryIO, Callable, ClassVar, Literal, Union
Expand Down Expand Up @@ -229,6 +230,8 @@ class DagFileProcessorProcess(WatchedSubprocess):
logger_filehandle: BinaryIO
parsing_result: DagFileParsingResult | None = None
decoder: ClassVar[TypeAdapter[ToManager]] = TypeAdapter[ToManager](ToManager)
_start_time: float | None = None
"""Time when the process started parsing the file."""

@classmethod
def start( # type: ignore[override]
Expand All @@ -242,6 +245,7 @@ def start( # type: ignore[override]
) -> Self:
proc: Self = super().start(target=target, **kwargs)
proc._on_child_started(callbacks, path, bundle_path)
proc._start_time = time.time()
return proc

def _on_child_started(
Expand Down Expand Up @@ -305,7 +309,18 @@ def is_ready(self) -> bool:

@property
def start_time(self) -> float:
return self._process.create_time()
if not self._start_time:
raise ValueError("start_time not set")
return self._start_time

@start_time.setter
def start_time(self, value: float | None = None) -> None:
if self._start_time:
# Ensures this is not set twice
return
if not value:
value = time.time()
self._start_time = value

def wait(self) -> int:
raise NotImplementedError(f"Don't call wait on {type(self).__name__} objects")
21 changes: 17 additions & 4 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import shutil
import signal
import textwrap
import time
from collections import deque
from datetime import datetime, timedelta
from logging.config import dictConfig
Expand Down Expand Up @@ -135,7 +134,6 @@ def teardown_class(self):
def mock_processor(self) -> DagFileProcessorProcess:
proc = MagicMock()
logger_filehandle = MagicMock()
proc.create_time.return_value = time.time()
proc.wait.return_value = 0
ret = DagFileProcessorProcess(
process_log=MagicMock(),
Expand Down Expand Up @@ -493,7 +491,7 @@ def test_kill_timed_out_processors_kill(self):
manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)

processor = self.mock_processor()
processor._process.create_time.return_value = timezone.make_aware(datetime.min).timestamp()
processor._start_time = timezone.make_aware(datetime.min).timestamp()
manager._processors = {
DagFileInfo(
bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER
Expand All @@ -512,7 +510,7 @@ def test_kill_timed_out_processors_no_kill(self):
)

processor = self.mock_processor()
processor._process.create_time.return_value = timezone.make_aware(datetime.max).timestamp()
processor._start_time = timezone.make_aware(datetime.max).timestamp()
manager._processors = {
DagFileInfo(
bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER
Expand All @@ -522,6 +520,21 @@ def test_kill_timed_out_processors_no_kill(self):
manager._kill_timed_out_processors()
mock_kill.assert_not_called()

def test_start_time_raises_when_not_set(self):
manager = DagFileProcessorManager(
max_runs=1,
processor_timeout=5,
)

processor = self.mock_processor()
manager._processors = {
DagFileInfo(
bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER
): processor
}
with pytest.raises(ValueError, match="start_time not set"):
manager._kill_timed_out_processors()

@pytest.mark.usefixtures("testing_dag_bundle")
@pytest.mark.parametrize(
["callbacks", "path", "expected_buffer"],
Expand Down