Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@

SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", "socket_cleanup_timeout")

# Maximum possible time (in seconds) that task will have for execution of auxiliary processes
# like listeners after task is complete.
TASK_OVERTIME_THRESHOLD: float = conf.getfloat("core", "task_success_overtime")

SERVER_TERMINATED = "SERVER_TERMINATED"

# These are the task instance states that require some additional information to transition into.
Expand Down Expand Up @@ -822,10 +826,6 @@ class ActivitySubprocess(WatchedSubprocess):
# does not hang around forever.
failed_heartbeats: int = attrs.field(default=0, init=False)

# Maximum possible time (in seconds) that task will have for execution of auxiliary processes
# like listeners after task is complete.
# TODO: This should come from airflow.cfg: [core] task_success_overtime
TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0
_task_end_time_monotonic: float | None = attrs.field(default=None, init=False)
_rendered_map_index: str | None = attrs.field(default=None, init=False)

Expand Down Expand Up @@ -975,9 +975,14 @@ def _handle_process_overtime_if_needed(self):
return
if (
self._task_end_time_monotonic
and (time.monotonic() - self._task_end_time_monotonic) > self.TASK_OVERTIME_THRESHOLD
and (time.monotonic() - self._task_end_time_monotonic) > TASK_OVERTIME_THRESHOLD
):
log.warning("Workload success overtime reached; terminating process", ti_id=self.id)
log.warning(
"Task success overtime reached; terminating process. "
"Modify `task_success_overtime` setting in [core] section of "
"Airflow configuration to change this limit.",
ti_id=self.id,
)
self.kill(signal.SIGTERM, force=True)

def _send_heartbeat_if_needed(self):
Expand Down
8 changes: 6 additions & 2 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,9 @@ def test_overtime_handling(
mocker.patch("time.monotonic", return_value=20.0)

# Patch the task overtime threshold
monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD", overtime_threshold)
monkeypatch.setattr(
"airflow.sdk.execution_time.supervisor.TASK_OVERTIME_THRESHOLD", overtime_threshold
)

mock_watched_subprocess = ActivitySubprocess(
process_log=mocker.MagicMock(),
Expand All @@ -758,7 +760,9 @@ def test_overtime_handling(
if expected_kill:
mock_kill.assert_called_once_with(signal.SIGTERM, force=True)
mock_logger.warning.assert_called_once_with(
"Workload success overtime reached; terminating process",
"Task success overtime reached; terminating process. "
"Modify `task_success_overtime` setting in [core] section of "
"Airflow configuration to change this limit.",
ti_id=TI_ID,
)
else:
Expand Down