From c81ee3dce40f89f087d63641313c97d8efe26cec Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 21 Dec 2025 18:16:16 +0100 Subject: [PATCH] Remove global from Task SDK log --- task-sdk/src/airflow/sdk/log.py | 43 +++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 2dacf0a611cb9..0dc6298182a3c 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -18,6 +18,7 @@ from __future__ import annotations import warnings +from collections.abc import Callable from functools import cache from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO, TextIO @@ -39,6 +40,29 @@ __all__ = ["configure_logging", "reset_logging", "mask_secret"] +class _WarningsInterceptor: + """A class to hold the reference to the original warnings.showwarning function.""" + + _original_showwarning: Callable | None = None + + @staticmethod + def register(new_callable: Callable) -> None: + if _WarningsInterceptor._original_showwarning is None: + _WarningsInterceptor._original_showwarning = warnings.showwarning + warnings.showwarning = new_callable + + @staticmethod + def reset() -> None: + if _WarningsInterceptor._original_showwarning is not None: + warnings.showwarning = _WarningsInterceptor._original_showwarning + _WarningsInterceptor._original_showwarning = None + + @staticmethod + def emit_warning(*args: Any) -> None: + if _WarningsInterceptor._original_showwarning is not None: + _WarningsInterceptor._original_showwarning(*args) + + def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: from airflow.sdk._shared.secrets_masker import redact @@ -121,12 +145,7 @@ def configure_logging( callsite_parameters=callsite_params, ) - global _warnings_showwarning - - if _warnings_showwarning is None: - _warnings_showwarning = warnings.showwarning - # Capture warnings and show them via structlog -- i.e. in task logs - warnings.showwarning = _showwarning + _WarningsInterceptor.register(_showwarning) def logger_at_level(name: str, level: int) -> Logger: @@ -258,18 +277,11 @@ def reset_logging(): """ from airflow.sdk._shared.logging.structlog import structlog_processors - global _warnings_showwarning - if _warnings_showwarning is not None: - warnings.showwarning = _warnings_showwarning - _warnings_showwarning = None - + _WarningsInterceptor.reset() structlog_processors.cache_clear() logging_processors.cache_clear() -_warnings_showwarning: Any = None - - def _showwarning( message: Warning | str, category: type[Warning], @@ -288,8 +300,7 @@ def _showwarning( warnings logger named "py.warnings" with level logging.WARNING. """ if file is not None: - if _warnings_showwarning is not None: - _warnings_showwarning(message, category, filename, lineno, file, line) + _WarningsInterceptor.emit_warning(message, category, filename, lineno, file, line) else: from airflow.sdk._shared.logging.structlog import reconfigure_logger