diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index 69a8f94d7145d..14c2c2abb395d 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -340,6 +340,16 @@ def send_task_to_executor( # Get the task from the app task_to_run = celery_app.tasks["execute_command"] args = [args] # type: ignore[list-item] + + # Pre-import redis.client to avoid SIGALRM interrupting module initialization. + # If timeout fires during import, redis module gets partially cached in sys.modules + # without the 'client' submodule bound, causing AttributeError on subsequent access. + # See: https://github.com/apache/airflow/issues/41359 + try: + import redis.client # noqa: F401 + except ImportError: + pass # Redis not installed or not using Redis backend + try: with timeout(seconds=OPERATION_TIMEOUT): result = task_to_run.apply_async(args=args, queue=queue) @@ -363,6 +373,13 @@ def fetch_celery_task_state(async_result: AsyncResult) -> tuple[str, str | Excep :return: a tuple of the Celery task key and the Celery state and the celery info of the task """ + # Pre-import redis.client to avoid SIGALRM interrupting module initialization. + # See: https://github.com/apache/airflow/issues/41359 + try: + import redis.client # noqa: F401 + except ImportError: + pass # Redis not installed or not using Redis backend + try: with timeout(seconds=OPERATION_TIMEOUT): # Accessing state property of celery task will make actual network request