Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 97 additions & 67 deletions airflow-core/src/airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
from __future__ import annotations

import os
from typing import Any
from typing import TYPE_CHECKING, Any
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.exceptions import AirflowException

if TYPE_CHECKING:
from airflow.logging_config import RemoteLogIO

LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()


Expand All @@ -48,7 +51,7 @@

DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")

BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER"))

PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")

Expand Down Expand Up @@ -84,7 +87,7 @@
"task": {
"class": "airflow.utils.log.file_task_handler.FileTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"base_log_folder": BASE_LOG_FOLDER,
"filters": ["mask_secrets"],
},
},
Expand Down Expand Up @@ -126,6 +129,7 @@
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
REMOTE_TASK_LOG: RemoteLogIO | None = None

if REMOTE_LOGGING:
ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
Expand All @@ -137,64 +141,86 @@
# WASB buckets should start with "wasb"
# HDFS path should start with "hdfs://"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})
remote_base_log_folder: str = conf.get_mandatory_value("logging", "remote_base_log_folder")
remote_task_handler_kwargs = conf.getjson("logging", "remote_task_handler_kwargs", fallback={})
if not isinstance(remote_task_handler_kwargs, dict):
raise ValueError(
"logging/remote_task_handler_kwargs must be a JSON object (a python dict), we got "
f"{type(remote_task_handler_kwargs)}"
)
delete_local_copy = conf.getboolean("logging", "delete_local_logs")

if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"s3_log_folder": REMOTE_BASE_LOG_FOLDER,
},
}
if remote_base_log_folder.startswith("s3://"):
from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO

DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"log_group_arn": url_parts.netloc + url_parts.path,
},
}
REMOTE_TASK_LOG = S3RemoteLogIO(
**(
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}

DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
"gcp_key_path": key_path,
},
}
elif remote_base_log_folder.startswith("cloudwatch://"):
from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO

url_parts = urlsplit(remote_base_log_folder)
REMOTE_TASK_LOG = CloudWatchRemoteLogIO(
**(
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
"log_group_arn": url_parts.netloc + url_parts.path,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}
elif remote_base_log_folder.startswith("gs://"):
from airflow.providers.google.cloud.logs.gcs_task_handler import GCSRemoteLogIO

key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None)

REMOTE_TASK_LOG = GCSRemoteLogIO(
**(
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
"gcp_key_path": key_path,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}
elif remote_base_log_folder.startswith("wasb"):
from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO

DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
wasb_log_container = conf.get_mandatory_value(
"azure_remote_logging", "remote_wasb_log_container", fallback="airflow-logs"
)
WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
"task": {
"class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
"wasb_container": wasb_log_container,
},
}

DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
REMOTE_TASK_LOG = WasbRemoteLogIO(
**(
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
"wasb_container": wasb_log_container,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}
elif remote_base_log_folder.startswith("stackdriver://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
# stackdriver:///airflow-tasks => airflow-tasks
log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
log_name = urlsplit(remote_base_log_folder).path[1:]
STACKDRIVER_REMOTE_HANDLERS = {
"task": {
"class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
Expand All @@ -205,23 +231,27 @@
}

DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
OSS_REMOTE_HANDLERS = {
"task": {
"class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"oss_log_folder": REMOTE_BASE_LOG_FOLDER,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
elif remote_base_log_folder.startswith("oss://"):
from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO

REMOTE_TASK_LOG = OSSRemoteLogIO(
**(
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}
elif remote_base_log_folder.startswith("hdfs://"):
HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
"base_log_folder": BASE_LOG_FOLDER,
"hdfs_log_folder": remote_base_log_folder,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
Expand All @@ -240,7 +270,7 @@
"task": {
"class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"base_log_folder": BASE_LOG_FOLDER,
"end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
"host": ELASTICSEARCH_HOST,
"frontend": ELASTICSEARCH_FRONTEND,
Expand Down Expand Up @@ -270,7 +300,7 @@
"task": {
"class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"base_log_folder": BASE_LOG_FOLDER,
"end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK,
"host": OPENSEARCH_HOST,
"port": OPENSEARCH_PORT,
Expand All @@ -290,4 +320,4 @@
"section 'elasticsearch' if you are using Elasticsearch. In the other case, "
"'remote_base_log_folder' option in the 'logging' section."
)
DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)
DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(remote_task_handler_kwargs)
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
48 changes: 48 additions & 0 deletions airflow-core/src/airflow/logging/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Protocol

if TYPE_CHECKING:
import structlog.typing

from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


class RemoteLogIO(Protocol):
"""Interface for remote task loggers."""

@property
def processors(self) -> tuple[structlog.typing.Processor, ...]: ...

"""
List of structlog processors to install in the task write path.

This is useful if a remote logging provider wants to either transform the structured log messages as they
are being written to a file, or if you want to upload messages as they are generated.
"""

def upload(self, path: os.PathLike | str) -> None:
"""Upload the given log path to the remote storage."""
...

def read(self, relative_path: str) -> tuple[LogSourceInfo, LogMessages | None]:
"""Read logs from the given remote log path."""
...
65 changes: 47 additions & 18 deletions airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,68 @@
import logging
import warnings
from logging.config import dictConfig
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from airflow.logging.remote import RemoteLogIO

log = logging.getLogger(__name__)


def configure_logging():
REMOTE_TASK_LOG: RemoteLogIO | None


def __getattr__(name: str):
if name == "REMOTE_TASK_LOG":
global REMOTE_TASK_LOG
load_logging_config()
return REMOTE_TASK_LOG


def load_logging_config() -> tuple[dict[str, Any], str]:
"""Configure & Validate Airflow Logging."""
logging_class_path = ""
try:
logging_class_path = conf.get("logging", "logging_config_class")
except AirflowConfigException:
log.debug("Could not find key logging_config_class in config")
global REMOTE_TASK_LOG
fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)

if logging_class_path:
try:
logging_config = import_string(logging_class_path)
# Sometimes we end up with `""` as the value!
logging_class_path = logging_class_path or fallback

user_defined = logging_class_path != fallback

try:
logging_config = import_string(logging_class_path)

# Make sure that the variable is in scope
if not isinstance(logging_config, dict):
raise ValueError("Logging Config should be of dict type")
# Make sure that the variable is in scope
if not isinstance(logging_config, dict):
raise ValueError("Logging Config should be of dict type")

if user_defined:
log.info("Successfully imported user-defined logging config from %s", logging_class_path)
except Exception as err:
# Import default logging configurations.
raise ImportError(f"Unable to load custom logging from {logging_class_path} due to {err}")

except Exception as err:
# Import default logging configurations.
raise ImportError(
f"Unable to load {'custom ' if user_defined else ''}logging config from {logging_class_path} due "
f"to: {type(err).__name__}:{err}"
)
else:
logging_class_path = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
logging_config = import_string(logging_class_path)
log.debug("Unable to load custom logging, using default config instead")
mod = logging_class_path.rsplit(".", 1)[0]
try:
remote_task_log = import_string(f"{mod}.REMOTE_TASK_LOG")
REMOTE_TASK_LOG = remote_task_log
except Exception as err:
log.info("Remote task logs will not be available due to an error: %s", err)

return logging_config, logging_class_path


def configure_logging():
logging_config, logging_class_path = load_logging_config()
try:
# Ensure that the password masking filter is applied to the 'task' handler
# no matter what the user did.
Expand Down
Loading
Loading