Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented May 9, 2025

Fixes a subtle bug where SUPERVISOR_COMMS was incorrectly set during supervisor-side request handling in InProcessSupervisorComms.send_request() -- reported in #50300 (comment) (Based on debugging a failure in the example_athena system test under dag.test()). It also now prevents regressions for other DAGs/tasks that rely on connection or variable fetches inside Task SDK runtime

Problem

When calling the supervisor’s _handle_request() (e.g. in response to a connection or variable fetch), some internal logic (like Variable.get() or Connection.get_connection_from_secrets()) would incorrectly detect that it was running in a Task SDK execution context — because SUPERVISOR_COMMS was still set.

This led to recursive calls into the SDK flow (e.g., calling SUPERVISOR_COMMS.lock) while already holding the lock, resulting in AttributeError: 'NoneType' object has no attribute 'lock' or deadlocks.

This caused bug when using dag.test() in system tests that specifically rely on connections or variables:

  • Tasks attempting to fetch a connection during execution caused the supervisor to recurse into its own comms path
  • This led to incorrect error handling (500 Internal Server Error) and test failures

Fix

  • The fix ensures SUPERVISOR_COMMS is temporarily unset while handling the request.
  • This prevents Task SDK context detection logic from activating during supervisor API handling.
  • The set_supervisor_comms(None) context manager is now explicitly used within send_request() to guard the call to _handle_request().

Why this matters

This fixes real bugs encountered when using dag.test() in system tests that rely on connections or variables:

  • Tasks attempting to fetch a connection during execution caused the supervisor to recurse into its own comms path
  • This led to incorrect error handling (500 Internal Server Error) and test failures

To Reproduce

Use following dag on main:

from airflow.sdk import Variable, dag, task

@dag
def xample_simplest_dag():
    @task
    def my_task():
        print("hellooo")
        import os
        # os.environ["AIRFLOW_VAR_MY_VARIABLE"] = "my_value"
        assert Variable.get("my_variable") == "my_value"

    my_task()


d = xample_simplest_dag()

if __name__ == "__main__":
    d.test(use_executor=False)

and run:

python /files/dags/example_simplest_dag.py

Which will cause:

...

Task instance is in running state
 Previous state of the Task instance: TaskInstanceState.QUEUED
Current task name:my_task
Dag name:xample_simplest_dag
hellooo
2025-05-09 23:50:40 [info     ] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1
2025-05-09 23:50:40 [debug    ] Variable not found in any of the configured Secrets Backends. Trying to retrieve from API server [task] key=my_variable
2025-05-09 23:50:40 [debug    ] Sending request                [task] msg=GetVariable(key='my_variable', type='GetVariable')
2025-05-09 23:50:40 [debug    ] Received message from task runner [task] msg=GetVariable(key='my_variable', type='GetVariable')
/opt/airflow/airflow-core/src/airflow/models/variable.py:147 DeprecationWarning: Using Variable.get from `airflow.models` is deprecated.Please use `from airflow.sdk import Variable` instead
2025-05-09 23:50:40 [info     ] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1
2025-05-09 23:50:40 [debug    ] Variable not found in any of the configured Secrets Backends. Trying to retrieve from API server [task] key=my_variable
2025-05-09 23:50:40 [error    ] Handle died with an error      [airflow.api_fastapi.execution_api.app]
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /usr/local/lib/python3.9/site-packages/starlette/_utils.py:76 in collapse_excgroups              │
│                                                                                                  │
│    73 @contextmanager                                                                            │
│    74 def collapse_excgroups() -> typing.Generator[None, None, None]:                            │
│    75 │   try:                                                                                   │
│ ❱  76 │   │   yield                                                                              │
│    77 │   except BaseException as exc:                                                           │
│    78 │   │   if has_exceptiongroups:  # pragma: no cover                                        │
│    79 │   │   │   while isinstance(exc, BaseExceptionGroup) and len(exc.exceptions) == 1:        │
│                                                                                                  │
│ /usr/local/lib/python3.9/site-packages/starlette/middleware/base.py:181 in __call__              │
│                                                                                                  │
│   178 │   │   │   │   response = await self.dispatch_func(request, call_next)                    │
│   179 │   │   │   │   await response(scope, wrapped_receive, send)                               │
│   180 │   │   │   │   response_sent.set()                                                        │
│ ❱ 181 │   │   │   │   recv_stream.close()                                                        │
│   182 │   │   if app_exc is not None and not exception_already_raised:                           │
│   183 │   │   │   raise app_exc                                                                  │
│   184                                                                                            │
│                                                                                                  │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │                  app_exc = AttributeError("'NoneType' object has no attribute 'lock'")       │ │
│ │ exception_already_raised = True                                                              │ │
│ │                  receive = <bound method ASGIResponder.asgi_receive of                       │ │
│ │                            <a2wsgi.asgi.ASGIResponder object at 0xffff61ff3af0>>             │ │
│ │              recv_stream = MemoryObjectReceiveStream(                                        │ │
....

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

Fixes a subtle bug where `SUPERVISOR_COMMS` was incorrectly set during supervisor-side request handling in `InProcessSupervisorComms.send_request()`.

When calling the supervisor’s `_handle_request()` (e.g. in response to a connection or variable fetch), some internal logic (like `Variable.get()` or `Connection.get_connection_from_secrets()`) would incorrectly detect that it was running in a Task SDK execution context — because `SUPERVISOR_COMMS` was still set.

This led to recursive calls into the SDK flow (e.g., calling `SUPERVISOR_COMMS.lock`) **while already holding the lock**, resulting in `AttributeError: 'NoneType' object has no attribute 'lock'` or deadlocks.

- The fix ensures `SUPERVISOR_COMMS` is temporarily unset while handling the request.
- This prevents Task SDK context detection logic from activating during supervisor API handling.
- The `set_supervisor_comms(None)` context manager is now explicitly used within `send_request()` to guard the call to `_handle_request()`.

- Unit tests for `set_supervisor_comms()` covering all override/restore edge cases
- A roundtrip test that verifies `send_request()` triggers `_handle_request()`, which in turn uses `send_msg()` to queue a response retrievable via `get_message()`

This fixes real bugs encountered when using `dag.test()` in system tests that rely on connections or variables:
- Tasks attempting to fetch a connection during execution caused the supervisor to recurse into its own comms path
- This led to incorrect error handling (`500 Internal Server Error`) and test failures

- Based on debugging a failure in the `example_athena` system test under `dag.test()`
- Prevents regressions for other DAGs/tasks that rely on connection or variable fetches inside Task SDK runtime
@kaxil kaxil requested a review from ferruzzi May 9, 2025 23:45
@kaxil kaxil requested review from amoghrajesh and ashb as code owners May 9, 2025 23:45
@kaxil kaxil added this to the Airflow 3.0.2 milestone May 9, 2025
@kaxil kaxil merged commit 1ab2474 into apache:main May 10, 2025
70 checks passed
@kaxil kaxil deleted the fix-bug-dag-test branch May 10, 2025 05:50
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. That wasnt easy to find

ayush3singh pushed a commit to ayush3singh/airflow that referenced this pull request May 10, 2025
kaxil added a commit that referenced this pull request May 12, 2025
kaxil added a commit that referenced this pull request Jun 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants