Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Aug 4, 2025

closes: #54087

What?

Move timeout implementation to task SDK because it is mostly used there to measure support execution_timeout for running tasks. It also cuts down a single link between utils.timeout and sdk, and sdk can have its own.

This PR moves the timeout implementation used in task execution to the task SDK and updates it to use structlog for consistent logging.

While I was at it, I also removed windows support there. It isn't really needed because afaik the only way to run Airflow on windows is using WSL which is Windows Subsystem for Linux, which end of the day operates under linux. I mean to say that applications will not care if the hardware is windows, but they see Linux and that makes it easier to wipe that portion out.

Why?

The timeout functionality is specific to task execution and should live with other task execution code in the Task SDK. This:

  1. Reduces dependencies between Core and task SDK
  2. Keeps execution specific code together

Why new module?

I initially moved it in task_runner, but we could repurpose it across providers etc, so it would be easier if it stayed as a module instead to not overload task_runner too.

Tested with a DAG too

DAG:

from time import sleep

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta


def print_hello():
    print("Going to sleep 100 seconds")
    sleep(100)

with DAG(
    dag_id="tasktimeout",
    schedule=None,
    catchup=False,
) as dag:
    hello_task = PythonOperator(
        task_id="ttimemout",
        retries=2,
        execution_timeout=timedelta(seconds=10),
        python_callable=print_hello,
    )
image

Migration

No migration needed - this is an internal implementation detail that doesn't affect DAG authors or providers.

What's next?

Migrate the providers to use timeout from SDK and nuke airflow.utils.timeout


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@potiuk
Copy link
Member

potiuk commented Aug 5, 2025

Lookgs good. But while we are at it. Maybe (suggestion - we do not have to do it now or maybe even at all, but maybe worth considering since you are moving timeout to task-sdk. Maybe we should implement additional fork here to handle timeout for long running badly written C code.

This only. really affect the "task" running - so while we have now separate 'task-only" implementaiton of Timeout, we could implement it here.

We discussed it before #53337 (and there were earlier discussions on that) - that the best option would be if timeout is handled in supervisor, but we really "can't" do it easily because it would effectively require the supervisor to parse the DAG file (catch-22).

But running alarm in the task interpreter has two drawbacks:

  • less important -> it does not allow to run code that implements SIGALARM handling on their own (for example some timout handlers in Pytest use SIGALRM for test timeouts, but I can imagine other libraries doing it internally

  • more important -> badly written C code that does not check for alarms periodically might cause the task to run forever - without respecting the timeout. SIGALRM is fired, passed to the long-running C-code and it waits until the c code checks for it. This caused a number of issues raised to us in the past.

Now - supervisor route might be not viable, but, we could have additional fork here to just handle timeout and nothing else. I.e. (pseudocode):

child_pid = fork()
if child:
    run_task()
else:
  setproctitle("timeut....")
   join(chid_pid, timeout)
   if child_pid still running:
        kill_child_pid_possibly_escalating_SIGTERM_SIGKILL

That is quite a bit simpler in implementation (no context manager, no alarm/signal handling AT ALLl, 100% resiliance against long running C code.

There is a LITTLE memory overhead. While Python is not "fork-friendly" due to reference counting, in this case it would entirely work because the "timeout" process would do literally NOTHING (except setting proctitle) after it forks the child - which means that the memory used by the "timeout" process would not be "copied on write" because no write would ever occur to any of the memory used by python objects.

Effectively every task that needs timeout would be three processes:

supervisor -> timeout -> task 

I think that is a cleaner and more resilient solution. The alarm handling is also somewhat obscure. such if/child/join is way cleaner and straightforward, less code, less boilerplate.

@ashb
Copy link
Member

ashb commented Aug 5, 2025

One option to handle it in the supervisor would be to have the task runner process send a new message, say StartTaskTimeout once it has parsed the DAG and got the task and knows what timeout is configured.

Then the supervisor could keep track of the timeout, and if the task process hasn't reported a state after the timeout then we start the existing INT->TERM->KILL signal process (the reason we look at reporting a state, not just the process being alive is for the existing "overtime" mechanism to let things like OpenLineage run)

@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Aug 5, 2025

Both your concerns, @potiuk:

less important -> it does not allow to run code that implements SIGALARM handling on their own (for example some timout handlers in Pytest use SIGALRM for test timeouts, but I can imagine other libraries doing it internally

more important -> badly written C code that does not check for alarms periodically might cause the task to run forever - without respecting the timeout. SIGALRM is fired, passed to the long-running C-code and it waits until the c code checks for it. This caused a number of issues raised to us in the past.

Should be addressed if we can move the timeout mechanism to the supervisor, it will monitor tasks externally.

Supervisor ─── Task Process
    ↑                    ↓
monitors &           runs user code
handles timeout 

And I think it should not be very hard to do so, we just have to use the comms to send across a message before running a task to communicate to the supervisor that a timeout is configured and it is going to be run with a timeout. We could repurpose the kill implementation for both execution timeout as well as no reports from the task.

@potiuk
Copy link
Member

potiuk commented Aug 5, 2025

Yep. Supervisor is the best place to handle timeout and sending back the timeout - even if a bit complex is likely best from overhead "no extra process needed" POV. Also it means that file/socket descriptors will not be duplicated between timeout and task which is (small) overhead of the separate process solution.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. moving timeout to supervisor is more complex and can be done later.

@amoghrajesh
Copy link
Contributor Author

@ashb are you OK with this PR?

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What difference does this make? Is it just removing the core import in tasksdk? Cool if that's what we're expecting.

@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Aug 6, 2025

What difference does this make? Is it just removing the core import in tasksdk? Cool if that's what we're expecting.

It's doing that precisely. Getting rid of that extra utils import.

@amoghrajesh amoghrajesh merged commit b22b1a6 into apache:main Aug 6, 2025
77 checks passed
@amoghrajesh amoghrajesh deleted the move-timeout-to-sdk branch August 6, 2025 13:47
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Move timeout utilities to execution_time in sdk

3 participants