-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
3.1.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
500 error is returned when trying to access a log of any task. Cloud logging is enabled to GCS.
API server's log contains the following error (the full error listing is provided below):
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'
There are also several open issues related to SUPERVISOR_COMMS: #51816, #48554.
Screenshot
What you think should happen instead?
Logs should be retrieved successfully.
How to reproduce
Access any log of any task.
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.12.3
apache-airflow-providers-common-compat==1.7.4
apache-airflow-providers-common-io==1.6.3
apache-airflow-providers-common-sql==1.28.1
apache-airflow-providers-fab==2.4.3
apache-airflow-providers-google==18.0.0
apache-airflow-providers-http==5.3.4
apache-airflow-providers-postgres==6.3.0
apache-airflow-providers-redis==4.3.1
apache-airflow-providers-smtp==2.2.1
apache-airflow-providers-standard==1.8.0
Deployment
Official Apache Airflow Helm Chart
Deployment details
Deployed on GKE with extended image based on apache/airflow:slim-3.1.0 and Helm chart 1.18.0.
Migrated from 3.0.6.
Anything else?
Error listing
INFO: 10.120.2.16:48230 - "GET /api/v2/dags/integrations.daily_stats/dagRuns/scheduled__2025-09-25T22%3A36%3A12.066275%2B00%3A00/taskInstances/get_ads_stats_data/logs/1?map_index=-1 HTTP/1.1" 500 Internal Server Error
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_utils.py", line 79, in collapse_excgroups
| yield
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 271, in __call__
| async with anyio.create_task_group() as task_group:
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 781, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/airflow/.local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/fastapi/applications.py", line 1082, in __call__
| await super().__call__(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/applications.py", line 113, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__
| raise exc
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__
| await self.app(scope, receive, _send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 29, in __call__
| await responder(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 130, in __call__
| await super().__call__(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 46, in __call__
| await self.app(scope, receive, self.send_with_compression)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/cors.py", line 85, in __call__
| await self.app(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
| await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| raise exc
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
| await app(scope, receive, sender)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 716, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 736, in app
| await route.handle(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 290, in handle
| await self.app(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 78, in app
| await wrap_app_handling_exceptions(app, request)(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| raise exc
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
| await app(scope, receive, sender)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 76, in app
| await response(scope, receive, send)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 270, in __call__
| with collapse_excgroups():
| ^^^^^^^^^^^^^^^^^^^^
| File "/usr/python/lib/python3.12/contextlib.py", line 158, in __exit__
| self.gen.throw(value)
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_utils.py", line 85, in collapse_excgroups
| raise exc
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 274, in wrap
| await func()
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 254, in stream_response
| async for chunk in self.body_iterator:
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py", line 61, in iterate_in_threadpool
| yield await anyio.to_thread.run_sync(_next, as_iterator)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
| return await get_async_backend().run_sync_in_worker_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
| return await future
| ^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 976, in run
| result = context.run(func, *args)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py", line 50, in _next
| return next(iterator)
| ^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/log_reader.py", line 134, in read_log_stream
| log_stream, out_metadata = self.read_log_chunks(ti, try_number, metadata)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/log_reader.py", line 104, in read_log_chunks
| return self.log_handler.read(ti, try_number, metadata=metadata)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/file_task_handler.py", line 760, in read
| read_result = self._read(task_instance, try_number, metadata)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/file_task_handler.py", line 614, in _read
| sources, logs = self._read_remote_logs(ti, try_number, metadata)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/file_task_handler.py", line 936, in _read_remote_logs
| sources, logs = remote_io.read(path, ti)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/log/gcs_task_handler.py", line 158, in read
| blobs = list(self.client.list_blobs(bucket_or_name=bucket, prefix=prefix))
| ^^^^^^^^^^^
| File "<attrs generated getattr airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO>", line 6, in __getattr__
| result = func(self)
| ^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/log/gcs_task_handler.py", line 103, in client
| if self.hook:
| ^^^^^^^^^
| File "<attrs generated getattr airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO>", line 6, in __getattr__
| result = func(self)
| ^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/log/gcs_task_handler.py", line 95, in hook
| return GCSHook(gcp_conn_id=conn_id)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/common/hooks/base_google.py", line 280, in __init__
| self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/hook.py", line 61, in get_connection
| conn = Connection.get(conn_id)
| ^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py", line 226, in get
| return _get_connection(conn_id)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py", line 181, in _get_connection
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
| ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
+------------------------------------
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/fastapi/applications.py", line 1082, in __call__
await super().__call__(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/applications.py", line 113, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 29, in __call__
await responder(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 130, in __call__
await super().__call__(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 46, in __call__
await self.app(scope, receive, self.send_with_compression)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/cors.py", line 85, in __call__
await self.app(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 716, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 736, in app
await route.handle(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 290, in handle
await self.app(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 78, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 76, in app
await response(scope, receive, send)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 270, in __call__
with collapse_excgroups():
^^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/contextlib.py", line 158, in __exit__
self.gen.throw(value)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_utils.py", line 85, in collapse_excgroups
raise exc
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 274, in wrap
await func()
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/responses.py", line 254, in stream_response
async for chunk in self.body_iterator:
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py", line 61, in iterate_in_threadpool
yield await anyio.to_thread.run_sync(_next, as_iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 976, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py", line 50, in _next
return next(iterator)
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/log_reader.py", line 134, in read_log_stream
log_stream, out_metadata = self.read_log_chunks(ti, try_number, metadata)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/log_reader.py", line 104, in read_log_chunks
return self.log_handler.read(ti, try_number, metadata=metadata)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/file_task_handler.py", line 760, in read
read_result = self._read(task_instance, try_number, metadata)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/file_task_handler.py", line 614, in _read
sources, logs = self._read_remote_logs(ti, try_number, metadata)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/log/file_task_handler.py", line 936, in _read_remote_logs
sources, logs = remote_io.read(path, ti)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/log/gcs_task_handler.py", line 158, in read
blobs = list(self.client.list_blobs(bucket_or_name=bucket, prefix=prefix))
^^^^^^^^^^^
File "<attrs generated getattr airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO>", line 6, in __getattr__
result = func(self)
^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/log/gcs_task_handler.py", line 103, in client
if self.hook:
^^^^^^^^^
File "<attrs generated getattr airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO>", line 6, in __getattr__
result = func(self)
^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/log/gcs_task_handler.py", line 95, in hook
return GCSHook(gcp_conn_id=conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/common/hooks/base_google.py", line 280, in __init__
self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/hook.py", line 61, in get_connection
conn = Connection.get(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py", line 226, in get
return _get_connection(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py", line 181, in _get_connection
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct