Skip to content

Conversation

@jscheffl
Copy link
Contributor

@jscheffl jscheffl commented Dec 28, 2025

Another small (in this case rather medium complex) increment to remove global statements for PR #58116

This removes 2 global statements from task_runner.py where explicitly a global wariable was used as shared SUPERVISOR_COMMS by intent. Proposing to change with via a static class and accessor-methods to prevent usage of global variables.

global is evil.

For this PR to merge seeking for explicit approval from one of the Task SDK creators @ashb, @kaxil and/or @amoghrajesh

@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from d1fdaaa to 4bb5081 Compare December 29, 2025 08:44
@jscheffl jscheffl added full tests needed We need to run full set of tests for this PR to merge all versions If set, the CI build will be forced to use all versions of Python/K8S/DBs labels Dec 29, 2025
@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 4bb5081 to 780ce42 Compare December 29, 2025 12:58
@jscheffl jscheffl marked this pull request as ready for review December 29, 2025 12:58
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo globals can be bad, but this specific use case is one of the legitimate ones, in fact better than the approach without globals.

def supervisor_comms() -> CommsDecoder[ToTask, ToSupervisor]:
    return _SupervisorCommsHolder.comms

But then violates it in supervisor.py:1669:

task_runner._SupervisorCommsHolder.comms = temp_comms  # 

and every call site becomes verbose:

Before:

SUPERVISOR_COMMS.send(msg)

After (unnecessary function call)

supervisor_comms().send(msg)

Every access now requires a function call + None check instead of direct variable access.

My 2 cents :)

# If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps)
# and should use the Task SDK API server path
if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"):
from airflow.sdk.execution_time.task_runner import is_supervisor_comms_initialized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this used hasattr on sys so we don't need to have airflow.sdk installed on the server components

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think we should solve it differently, I would say here just checking if "task-sdk" is installed should be a better check?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or smth else - but this should be revised after we complete the isolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, have reverted/adjusted to previous state - as it was repeated Copy&Paste have moved it into a utility in https://github.com/apache/airflow/pull/59876/changes#diff-7694d13e2f87c84d20b0b8b44797bf96d754ae270204217e518082decc74649bR104

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would leave other improvements of detection to another PR.

@potiuk
Copy link
Member

potiuk commented Dec 29, 2025

Every access now requires a function call + None check instead of direct variable access.

I think if we add @cache to def supervisor_comms(), this will be faster and will only require a hash lookup - and then, I think performance is not a concern any more.

I like this pattarn that Jens introduced a lot more to be honest, it is more unit-test friendly IMHO.

Also that removes the None-check for every call. When None is detected in a first call - RuntimeException is thrown and interpreter will exit - so None will be checked only at first use when cache is set.

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be OK with the current pattern we have for the sake of ease of understanding but if we have to change it, I do not think there is a better solution than this one. Would require some getting used to to use the new pattern

@jscheffl
Copy link
Contributor Author

jscheffl commented Dec 30, 2025

and every call site becomes verbose:

Before:

SUPERVISOR_COMMS.send(msg)

After (unnecessary function call)

supervisor_comms().send(msg)

Every access now requires a function call + None check instead of direct variable access.

@kaxil @amoghrajesh fair points and I am also not 100% convinced the solution is perfect. Te usage of the SUPERVISOR_COMMS seems still to be a case where we mess a lot with a global variable and encapsulating the state is still beneficial.
Had another sleep about this and understand that supervisor_comms().send() is also not cool, how about if we change it to:

supervisor_send(data)

...as a shortcut? Would it make it better to handle the sending in the function and have better readable code w/o global?

By the way with @cache I assume execution is slower as the has need to be built and then a lookup must be made which is more expensive than a function pointer lookup and jump. Therefore would not like a @cache solution.

@potiuk
Copy link
Member

potiuk commented Dec 30, 2025

By the way with @cache I assume execution is slower as the has need to be built and then a lookup must be made which is more expensive than a function pointer lookup and jump. Therefore would not like a @cache solution.

I don't think so. Calling a function in Python is generally very slow operation. It's not a classic pointer jump. Functions calls in Python are done by interpreter, they are not using jumps as C programs do. Interpreter has to look-up the method to call, create a new frame, push it on "Python stack" and clean the frame after it returns. This is all done "in the interpreter" - it's not even using the processor stack. "Python stack" for method frames is actually stored in heap memory, not in processor stack - so any stack manipulation (calling and returning from function) is kinda slow.

I did some basic micro-benchmarks:

import time
from functools import lru_cache

class MethodBenchmark:
    def __init__(self):
        self.call_count = 0

    def empty_method(self):
        """Empty method without caching"""
        self.call_count += 1
        return None

    @lru_cache(maxsize=128)
    def cached_empty_method(self):
        """Empty method with caching"""
        return None


def benchmark():
    obj = MethodBenchmark()
    iterations = 1_000_000

    # Benchmark non-cached method
    start = time.perf_counter()
    for _ in range(iterations):
        obj.empty_method()
    non_cached_time = time.perf_counter() - start

    # Benchmark cached method
    start = time.perf_counter()
    for _ in range(iterations):
        obj.cached_empty_method()
    cached_time = time.perf_counter() - start

    # Results
    print(f"Non-cached method: {non_cached_time:.6f} seconds")
    print(f"Cached method: {cached_time:.6f} seconds")
    print(f"Speedup: {non_cached_time / cached_time:.2f}x")
    print(f"Non-cached call count: {obj.call_count}")


if __name__ == "__main__":
    benchmark()

Result with Python 3.10

/Users/jarekpotiuk/code/airflow/.venv/bin/python /Users/jarekpotiuk/Library/Application Support/JetBrains/IntelliJIdea2025.3/scratches/scratch_5.py 
Non-cached method: 0.026673 seconds
Cached method: 0.026702 seconds
Speedup: 1.00x
Non-cached call count: 1000000

Process finished with exit code 0

This means that when you put @cache -> it never runs slower than method call, and additionaly you save on all the executed code inside.

I modified the code of both methods to do single if:

        if self.call_count == 0:
            self.call_count += 1
        else:
            self.call_count += 1

And there are the results:

Non-cached method: 0.032580 seconds
Cached method: 0.026062 seconds
Speedup: 1.25x
Non-cached call count: 1000001

Process finished with exit code 0

(100001 is because cached call increased it by 1)

There are plenty of optimisations in Python 3.11 - 3.14 that might skew this simple example (specializing adaptive interpreter changes and JIT) - so I run it with Python 3.10

Similar discussion: https://stackoverflow.com/questions/14648374/python-function-calls-are-really-slow

@potiuk
Copy link
Member

potiuk commented Dec 30, 2025

supervisor_send(data)

I think that's a good idea, it's likely better to expose "comms actions" than "comm" itself.

@potiuk
Copy link
Member

potiuk commented Dec 30, 2025

BTW. To be perfectly honest, In this case I think performance is not as important (at least until we will start doing the communication very frequently - for example following @dabla optimisation / async task reporting back status of individual async coroutines back to scheduler). The comms overhead for inter-process communication and serialization of data involved (every such call needs to serialize data sent across the wire - even if shared memory is used to communicate between processes) is already likely order of magnitude slower than single method call in Python, so this should not be too much of a concern.

In this case I think we should optimise for readability, I also do not like the extra () needed in the original proposal - that's why supervisor_send(data) is probably best approach.

@amoghrajesh
Copy link
Contributor

In this case I think we should optimise for readability, I also do not like the extra () needed in the original proposal - that's why supervisor_send(data) is probably best approach.

I agree with this. If we have to go down the way of removing globals here, I would prioritise using supervisor_send or supervisor_comms_send

@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 780ce42 to df71bea Compare December 31, 2025 15:48
@jscheffl
Copy link
Contributor Author

jscheffl commented Dec 31, 2025

Thanks for the feedback, have it now adjusted to use supervisor_send() which is a bit shorter.

Re-review appreciated :-D

(Am surprised actually that I wanted to remove two global statements but actually am slimming down the codebase by 40 LoC now :-D)

@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 69efc3d to 3ec9115 Compare December 31, 2025 17:06
@jscheffl jscheffl requested a review from amoghrajesh January 2, 2026 09:45
@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 3ec9115 to 8acbbc4 Compare January 3, 2026 12:18
@potiuk
Copy link
Member

potiuk commented Jan 4, 2026

LVGTM (V=Very)

Most of the slimming goes from consolidation of copy&pasted comments to a single place - but this is one of the best trimming you can do as DRY in this case is important :D

@dabla
Copy link
Contributor

dabla commented Jan 5, 2026

Nice work Jens!

@jscheffl jscheffl requested a review from kaxil January 6, 2026 21:02
@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 8acbbc4 to b45f6cd Compare January 6, 2026 21:03
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I'm -1 to most of this change (the has_execution_context() fn part is good):

Moving from a global variable of SUPERVISOR_COMMS to a class containing a class variable is changing form a global variable, to a global variable by a different name. It's a change for no benefit. Globals aren't inherrently bad, they are tool that have their place.

map_index: Mapped[int] = mapped_column(Integer, nullable=False, server_default=text("-1"))


def has_execution_context() -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrectly named. -- _SupervisorCommsHolder.comms") would be set in parsing, and parsing is not an "execution" context.

@kaxil Didn't you recently add some other context (server vs something else)? What was that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming was made from the previous code comment If this is set it means are in some kind of execution context - but other proposals welcome.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming things is hard!

Copy link
Member

@kaxil kaxil Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah _AIRFLOW_PROCESS_CONTEXT that accepts client / server

# 2. Check for explicit server context
if os.environ.get("_AIRFLOW_PROCESS_CONTEXT") == "server":
# Server context: API server, scheduler
# uses the default server list

# Mark as client-side (runs user DAG code)
# Prevents inheriting server context from parent DagProcessorManager
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming is hard. Have renamed now to is_client_process_context() and to improve it when I had my hands on it, the decision is also now based on the ENV.


# If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps)
# and should use the Task SDK API server path
return hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "_SupervisorCommsHolder.comms")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm 90% sure that this will always return false. hasattr doesn't look "deeply".

In [2]: airflow.sdk.execution_time.task_runner.AirflowException.__mro__
Out[2]: (airflow.sdk.exceptions.AirflowException, Exception, BaseException, object)

In [3]: hasattr(airflow.sdk.execution_time.task_runner, "AirflowException.__mro__")
Out[3]: False

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. And this means our unit tests do not exercise this path correctly.

In [1]: import airflow.sdk.execution_time.task_runner
2026-01-07T10:53:37.236044Z [warning  ] Skipping masking for a secret as it's too short (<5 chars) [airflow._shared.secrets_masker.secrets_masker] loc=secrets_masker.py:551
2026-01-07T10:53:37.236167Z [warning  ] Skipping masking for a secret as it's too short (<5 chars) [airflow.sdk._shared.secrets_masker.secrets_masker] loc=secrets_masker.py:551

In [2]: import sys

In [3]: airflow.sdk.execution_time.task_runner._SupervisorCommsHolder.comms = 1

In [4]: hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "_SupervisorCommsHolder.comms")
Out[4]: False

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good catch! Thenaks! Have reworked the logic now.

Comment on lines 752 to 756
def supervisor_send(msg: ToSupervisor) -> ToTask | None:
"""Send a message to the supervisor as convenience for get_supervisor_comms().send()."""
if _SupervisorCommsHolder.comms is None:
raise RuntimeError("Supervisor comms not initialized yet. Call set_supervisor_comms() instead.")
return _SupervisorCommsHolder.comms.send(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to keep this sort of pattern, I think it might be better to do something similar to what we do for metrics/Stats

class UnsetComms(CommsDecoder[ToTask, ToSupervisor]):
    def send(self, msg):
        raise RuntimeError("Supervisor comms not initialized yet. Call set_supervisor_comms() instead.")
    ...
    initalized: Final[bool] = False

And then SUPERVISOR_COMMS/_SupervisorCommsHolder.comms can be initialized to an instance of this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past I created some kind of Singleton metadata class, then you only needed to extend your class with it and it would behave like a singleton, even if you could call the constructor multiple times, you would always end up having the same instance. In Java this was simple to achieve by defining you class as an enum, but not in Python.

So I ended up creating a Singleton class for Python:

class Singleton(type):
    _instances: dict = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

I did the same for the Flyweight pattern:

import inspect
from itertools import count


class Flyweight(type):
    _instances: dict = {}
    _ignore_args: dict = {}

    @classmethod
    def __prepare__(mcs, name, bases, **kwargs):
        return super().__prepare__(name, bases, **kwargs)

    def __new__(mcs, name, bases, namespace, **kwargs):
        return super().__new__(mcs, name, bases, namespace)

    def __init__(cls, name, bases, namespace, **kwargs):
        super().__init__(name, bases, namespace, **kwargs)
        if kwargs.get("ignore_args"):
            cls._ignore_args[name] = kwargs.get("ignore_args")

    def __filter_ignored_arguments(cls, *args, **kwargs):
        parameters = list(inspect.signature(cls.__init__).parameters.values())
        parameters.pop(0)
        if len(kwargs) > 0:
            constructor_args = [
                (index, name) for index, name in enumerate(kwargs.keys())
            ]
            args = tuple(kwargs.values())
        else:
            constructor_args = [
                (index, parameter.name) for index, parameter in enumerate(parameters)
            ]
        ignored_indices = list(
            map(
                lambda arg: arg[0],
                filter(
                    lambda arg: arg[1] in cls._ignore_args.get(cls.__name__, []),
                    constructor_args,
                ),
            )
        )
        index = count(start=0, step=1)
        return [value for value in args if next(index) not in ignored_indices]

    def __call__(cls, *args, **kwargs):
        key = "{}-{}".format(
            cls.__name__, cls.__filter_ignored_arguments(*args, **kwargs).__str__()
        )
        if key not in cls._instances:
            cls._instances[key] = super(Flyweight, cls).__call__(*args, **kwargs)
        return cls._instances[key]

Dunno if this could be a nice addition to solve those generic problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reworked the SupervisorComms now to be a Singleton.

@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from b45f6cd to 626b383 Compare January 11, 2026 11:11
@jscheffl
Copy link
Contributor Author

Reworked PR with the review feedback, especially changed to a Singleton implementation to prevent a static class with static member. Better like this?

@jscheffl jscheffl requested review from ashb and dabla January 11, 2026 19:38
@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 2df8bfe to 18f93b3 Compare January 15, 2026 22:44
@jscheffl
Copy link
Contributor Author

Up by another 109 commits - can I have another round of feedback @ashb / @amoghrajesh / @kaxil ?

@jscheffl jscheffl force-pushed the bugfix/remove-global-from-task-runner-supervisor-comms branch from 18f93b3 to c103ddf Compare January 18, 2026 10:41
@jscheffl
Copy link
Contributor Author

Up by another 59 commits

@amoghrajesh
Copy link
Contributor

I'd leave @ashb / @kaxil to be a judge of this one

@dabla
Copy link
Contributor

dabla commented Jan 19, 2026

Reworked PR with the review feedback, especially changed to a Singleton implementation to prevent a static class with static member. Better like this?

I like that refactor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

all versions If set, the CI build will be forced to use all versions of Python/K8S/DBs area:DAG-processing area:task-sdk area:Triggerer full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants