Skip to content

Conversation

@qwe-kev
Copy link

@qwe-kev qwe-kev commented Dec 19, 2025

closes: #53337
related: #57174

This PR implements proper execution timeout handling for Airflow 3.0 by moving timeout enforcement from the task process to the supervisor process.

Previously, execution_timeout was handled inside the task process using a timeout decorator. This approach failed when:

Changes

  • Added TaskExecutionTimeout message for worker-to-supervisor communication
  • Supervisor monitors execution time and enforces timeout with SIGTERM/SIGKILL
  • Removed in-process timeout decorator from task execution
  • Timeout measurement starts after DAG parsing (excludes startup overhead)

Implementation

  1. Worker sends timeout_seconds to supervisor after DAG parsing
  2. Supervisor tracks elapsed time using monotonic clock
  3. On timeout: sends SIGTERM, then SIGKILL after 5-second grace period

This ensures reliable timeout enforcement at the supervisor level, preventing runaway tasks even when the task process encounters errors.

closes: apache#53337
related: apache#57174

This PR implements proper execution timeout handling for Airflow 3.0 by
moving timeout enforcement from the task process to the supervisor process.

Previously, execution_timeout was handled inside the task process using a
timeout decorator. This approach failed when:
- Task process encountered SIGSEGV or other signals (apache#57174)
- Native code ran in tight loops without handling Python signals
- Process was killed before timeout could be enforced

Changes:
- Added TaskExecutionTimeout message for worker-to-supervisor communication
- Supervisor monitors execution time and enforces timeout with SIGTERM/SIGKILL
- Removed in-process timeout decorator from task execution
- Timeout measurement starts after DAG parsing (excludes startup overhead)

Implementation:
1. Worker sends timeout_seconds to supervisor after DAG parsing
2. Supervisor tracks elapsed time using monotonic clock
3. On timeout: sends SIGTERM, then SIGKILL after 5-second grace period

This ensures reliable timeout enforcement at the supervisor level,
preventing runaway tasks even when the task process encounters errors.
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 19, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@potiuk
Copy link
Member

potiuk commented Dec 20, 2025

This looks really good - can you please add test cases covering the timeout handling in supervisor ?

- Add 9 unit tests for timeout handling logic
  - Test SIGTERM/SIGKILL escalation behavior
  - Test grace period enforcement
  - Test monotonic clock usage
  - Test message serialization

- Add 4 integration tests with real subprocesses
  - Test actual timeout enforcement
  - Test SIGKILL escalation when task ignores SIGTERM
  - Test tasks completing before/without timeout

- Add client_with_ti_start fixture for mocking API client
- Tests account for MIN_HEARTBEAT_INTERVAL in timing assertions

All 13 tests passing. No existing tests broken.
@qwe-kev
Copy link
Author

qwe-kev commented Dec 20, 2025

Added a comprehensive test suite for timeout handling as requested. All tests are passing.

Test Coverage:

  • 9 unit tests for timeout logic (SIGTERM/SIGKILL escalation, grace periods, etc.)
  • 4 integration tests with real subprocesses
  • All 13 tests passing locally
  • No existing tests broken

defined on BaseOperator. Addresses review feedback
Comment on lines +959 to +967
# Execution timeout tracking
_execution_timeout_seconds: float | None = attrs.field(default=None, init=False)
"""Task execution timeout in seconds, received from the task process."""

_task_execution_start_time: float | None = attrs.field(default=None, init=False)
"""Monotonic time when task execution actually started (after parsing DAG)."""

_timeout_sigterm_sent_at: float | None = attrs.field(default=None, init=False)
"""Monotonic time when SIGTERM was sent due to timeout."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This brings the question if 0.0 in any of these values affects logic. You use is None/is not None in some checks, but simple boolean checks in others, so this is not immediately clear. This needs to be treated carefully.

Copy link
Author

@qwe-kev qwe-kev Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I've updated the code to use explicit is None checks throughout to handle the edge case where these float values could technically be 0.0 (which is falsy in Python). Made the following changes:

  1. supervisor.py - Changed falsy checks to explicit None checks:

    • Line 1135: if not self._execution_timeout_seconds → if self._execution_timeout_seconds is None

    • Line 1138: if not self._task_execution_start_time → if self._task_execution_start_time is None

  2. task_runner.py - Added validation to only send positive timeouts:

    • Line 761: Added if timeout_seconds > 0: check before sending timeout to supervisor

qwe-kev and others added 2 commits December 22, 2025 12:44
- Changed falsy checks to 'is None' checks in supervisor.py to handle
  edge case where timeout values could be 0.0
- Added validation in task_runner.py to only send positive timeouts
- Prevents 0.0 (falsy) from being incorrectly treated as None

Addresses reviewer feedback
@qwe-kev
Copy link
Author

qwe-kev commented Jan 6, 2026

Hi, I wanted to follow up on this contribution. Is there anything I can do to help move this forward or any additional information needed?

@potiuk
Copy link
Member

potiuk commented Jan 6, 2026

Hi, I wanted to follow up on this contribution. Is there anything I can do to help move this forward or any additional information needed?

Turning that PR green might be a good start.

qwe-kev and others added 3 commits January 6, 2026 16:53
- Add test coverage for TaskExecutionTimeout message in test_supervisor.py
- Remove deprecated test_run_task_timeout and test_execution_timeout from test_task_runner.py
  (timeout now handled by supervisor, covered by TestExecutionTimeoutIntegration tests)
- Remove unused imports (AirflowTaskTimeout, _execute_task, time)
- Set expected_body=None for TaskExecutionTimeout as it's a one-way message

All tests now pass (689 passed).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Handle task timeouts (execution_timeout) at supervisor

3 participants