Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow.exceptions import AirflowException

if TYPE_CHECKING:
from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO
from airflow.logging.remote import RemoteLogIO, RemoteLogStreamIO

LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()

Expand Down
26 changes: 18 additions & 8 deletions airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,31 @@
log = logging.getLogger(__name__)


REMOTE_TASK_LOG: RemoteLogIO | None
DEFAULT_REMOTE_CONN_ID: str | None = None
class _ActiveLoggingConfig:
"""Private class to hold active logging config variables."""

logging_config_loaded: bool = False
remote_task_log: RemoteLogIO | None
default_remote_conn_id: str | None = None

def __getattr__(name: str):
if name == "REMOTE_TASK_LOG":

def get_remote_task_log() -> RemoteLogIO | None:
if not _ActiveLoggingConfig.logging_config_loaded:
load_logging_config()
return _ActiveLoggingConfig.remote_task_log


def get_default_remote_conn_id() -> str | None:
if not _ActiveLoggingConfig.logging_config_loaded:
load_logging_config()
return REMOTE_TASK_LOG
return _ActiveLoggingConfig.default_remote_conn_id


def load_logging_config() -> tuple[dict[str, Any], str]:
"""Configure & Validate Airflow Logging."""
global REMOTE_TASK_LOG, DEFAULT_REMOTE_CONN_ID
fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
_ActiveLoggingConfig.logging_config_loaded = True

# Sometimes we end up with `""` as the value!
logging_class_path = logging_class_path or fallback
Expand Down Expand Up @@ -75,8 +85,8 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
mod = import_module(modpath)

# Load remote logging configuration from the custom module
REMOTE_TASK_LOG = getattr(mod, "REMOTE_TASK_LOG")
DEFAULT_REMOTE_CONN_ID = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None)
_ActiveLoggingConfig.remote_task_log = getattr(mod, "REMOTE_TASK_LOG")
_ActiveLoggingConfig.default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None)
except Exception as err:
log.info("Remote task logs will not be available due to an error: %s", err)

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,9 +922,9 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | Stre
"""
remote_io = None
try:
from airflow.logging_config import REMOTE_TASK_LOG
from airflow.logging_config import get_remote_task_log

remote_io = REMOTE_TASK_LOG
remote_io = get_remote_task_log()
except Exception:
pass

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ def setup_mock_aws():

import airflow.logging_config

airflow.logging_config.REMOTE_TASK_LOG = s3_remote_log_io
airflow.logging_config._ActiveLoggingConfig.remote_task_log = s3_remote_log_io

sources, logs = fth._read_remote_logs(ti, try_number=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.dag import sync_dag_to_db
from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, clear_db_runs
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS


def get_time_str(time_in_milliseconds):
Expand Down Expand Up @@ -106,7 +106,12 @@ def setup_tests(self, create_runtime_ti, tmp_path, monkeypatch):
# loggers.

# Set up the right chain of processors so the event looks like we want for our full test
monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", self.subject)
if AIRFLOW_V_3_2_PLUS:
monkeypatch.setattr(
airflow.logging_config._ActiveLoggingConfig, "remote_task_log", self.subject
)
else:
monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", self.subject)
try:
procs = airflow.sdk.log.logging_processors(colors=False, json_output=False)
except TypeError:
Expand Down
10 changes: 5 additions & 5 deletions task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,22 +188,22 @@ def init_log_file(local_relative_path: str) -> Path:


def load_remote_log_handler() -> RemoteLogIO | None:
import airflow.logging_config
from airflow.logging_config import get_remote_task_log

return airflow.logging_config.REMOTE_TASK_LOG
return get_remote_task_log()


def load_remote_conn_id() -> str | None:
import airflow.logging_config

# TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
# Currently using Core's conf for remote logging consistency.
from airflow.configuration import conf

if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None):
return conn_id

return airflow.logging_config.DEFAULT_REMOTE_CONN_ID
from airflow.logging_config import get_default_remote_conn_id

return get_default_remote_conn_id()


def relative_path_from_logger(logger) -> Path | None:
Expand Down
Loading