diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index e658531fcde91..4deb996de08ea 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -27,7 +27,7 @@ from airflow.exceptions import AirflowException if TYPE_CHECKING: - from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO + from airflow.logging.remote import RemoteLogIO, RemoteLogStreamIO LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index c942f4703c169..103e070faf362 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -32,21 +32,31 @@ log = logging.getLogger(__name__) -REMOTE_TASK_LOG: RemoteLogIO | None -DEFAULT_REMOTE_CONN_ID: str | None = None +class _ActiveLoggingConfig: + """Private class to hold active logging config variables.""" + logging_config_loaded: bool = False + remote_task_log: RemoteLogIO | None + default_remote_conn_id: str | None = None -def __getattr__(name: str): - if name == "REMOTE_TASK_LOG": + +def get_remote_task_log() -> RemoteLogIO | None: + if not _ActiveLoggingConfig.logging_config_loaded: + load_logging_config() + return _ActiveLoggingConfig.remote_task_log + + +def get_default_remote_conn_id() -> str | None: + if not _ActiveLoggingConfig.logging_config_loaded: load_logging_config() - return REMOTE_TASK_LOG + return _ActiveLoggingConfig.default_remote_conn_id def load_logging_config() -> tuple[dict[str, Any], str]: """Configure & Validate Airflow Logging.""" - global REMOTE_TASK_LOG, DEFAULT_REMOTE_CONN_ID fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) + _ActiveLoggingConfig.logging_config_loaded = True # Sometimes we end up with `""` as the value! logging_class_path = logging_class_path or fallback @@ -75,8 +85,8 @@ def load_logging_config() -> tuple[dict[str, Any], str]: mod = import_module(modpath) # Load remote logging configuration from the custom module - REMOTE_TASK_LOG = getattr(mod, "REMOTE_TASK_LOG") - DEFAULT_REMOTE_CONN_ID = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None) + _ActiveLoggingConfig.remote_task_log = getattr(mod, "REMOTE_TASK_LOG") + _ActiveLoggingConfig.default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None) except Exception as err: log.info("Remote task logs will not be available due to an error: %s", err) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 02ffdd0d97ae6..2bb14312fc5bb 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -922,9 +922,9 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | Stre """ remote_io = None try: - from airflow.logging_config import REMOTE_TASK_LOG + from airflow.logging_config import get_remote_task_log - remote_io = REMOTE_TASK_LOG + remote_io = get_remote_task_log() except Exception: pass diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index cbe1a61d1c618..ad52eb8e8dd95 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -670,7 +670,7 @@ def setup_mock_aws(): import airflow.logging_config - airflow.logging_config.REMOTE_TASK_LOG = s3_remote_log_io + airflow.logging_config._ActiveLoggingConfig.remote_task_log = s3_remote_log_io sources, logs = fth._read_remote_logs(ti, try_number=1) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 5d43056410598..11c73bbf5b951 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -46,7 +46,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.dag import sync_dag_to_db from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS def get_time_str(time_in_milliseconds): @@ -106,7 +106,12 @@ def setup_tests(self, create_runtime_ti, tmp_path, monkeypatch): # loggers. # Set up the right chain of processors so the event looks like we want for our full test - monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", self.subject) + if AIRFLOW_V_3_2_PLUS: + monkeypatch.setattr( + airflow.logging_config._ActiveLoggingConfig, "remote_task_log", self.subject + ) + else: + monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", self.subject) try: procs = airflow.sdk.log.logging_processors(colors=False, json_output=False) except TypeError: diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 0dc6298182a3c..198b156a1d65a 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -188,14 +188,12 @@ def init_log_file(local_relative_path: str) -> Path: def load_remote_log_handler() -> RemoteLogIO | None: - import airflow.logging_config + from airflow.logging_config import get_remote_task_log - return airflow.logging_config.REMOTE_TASK_LOG + return get_remote_task_log() def load_remote_conn_id() -> str | None: - import airflow.logging_config - # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? # Currently using Core's conf for remote logging consistency. from airflow.configuration import conf @@ -203,7 +201,9 @@ def load_remote_conn_id() -> str | None: if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None): return conn_id - return airflow.logging_config.DEFAULT_REMOTE_CONN_ID + from airflow.logging_config import get_default_remote_conn_id + + return get_default_remote_conn_id() def relative_path_from_logger(logger) -> Path | None: