Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] integrate with Ray structured logging #46215

Merged
4 changes: 4 additions & 0 deletions python/ray/_private/ray_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"taskName",
}

LOGGER_FLATTEN_KEYS = {
"ray_serve_extra_fields",
}


class LogKey(str, Enum):
# Core context
Expand Down
29 changes: 27 additions & 2 deletions python/ray/_private/ray_logging/formatters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
import logging
import json
from ray._private.ray_logging.constants import LogKey, LOGRECORD_STANDARD_ATTRS
from ray._private.ray_logging.constants import (
LogKey,
LOGRECORD_STANDARD_ATTRS,
LOGGER_FLATTEN_KEYS,
)
from ray._private.ray_constants import LOGGER_FORMAT
from typing import Any, Dict


def _append_flatten_attributes(formatted_attrs: Dict[str, Any], key: str, value: Any):
"""Flatten the dictionary values for special keys and append the values in place.

If the key is in `LOGGER_FLATTEN_KEYS`, the value will be flattened and appended
to the `formatted_attrs` dictionary. Otherwise, the key-value pair will be appended
directly.
"""
if key in LOGGER_FLATTEN_KEYS:
if not isinstance(value, dict):
raise ValueError(
f"Expected a dictionary passing into {key}, but got {type(value)}"
)
for k, v in value.items():
if k in formatted_attrs:
raise KeyError(f"Found duplicated key in the log record: {k}")
formatted_attrs[k] = v
else:
formatted_attrs[key] = value


def generate_record_format_attrs(
Expand All @@ -28,7 +53,7 @@ def generate_record_format_attrs(
for key, value in record.__dict__.items():
# Both Ray and user-provided context are stored in `record_format`.
if key not in LOGRECORD_STANDARD_ATTRS:
record_format_attrs[key] = value
_append_flatten_attributes(record_format_attrs, key, value)
return record_format_attrs


Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@
SERVE_LOG_TIME: "%(asctime)s",
}

# There are some attributes that we only use internally or don't provide values to the
# users. Adding to this set will remove them from structured logs.
SERVE_LOG_UNWANTED_ATTRS = {
"serve_access_log",
"task_id",
"job_id",
}

SERVE_LOG_EXTRA_FIELDS = "ray_serve_extra_fields"

# Serve HTTP request header key for routing requests.
Expand Down
147 changes: 63 additions & 84 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
import builtins
import copy
import json
import logging
import os
import sys
import traceback
from typing import Any, Optional, Tuple

import ray
from ray._private.ray_logging.filters import CoreContextFilter
from ray._private.ray_logging.formatters import JSONFormatter
from ray.serve._private.common import ServeComponentType
from ray.serve._private.constants import (
RAY_SERVE_ENABLE_CPU_PROFILING,
RAY_SERVE_ENABLE_JSON_LOGGING,
RAY_SERVE_ENABLE_MEMORY_PROFILING,
RAY_SERVE_LOG_TO_STDERR,
SERVE_LOG_ACTOR_ID,
SERVE_LOG_APPLICATION,
SERVE_LOG_COMPONENT,
SERVE_LOG_COMPONENT_ID,
SERVE_LOG_DEPLOYMENT,
SERVE_LOG_EXTRA_FIELDS,
SERVE_LOG_LEVEL_NAME,
SERVE_LOG_MESSAGE,
SERVE_LOG_RECORD_FORMAT,
SERVE_LOG_REPLICA,
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_TIME,
SERVE_LOG_WORKER_ID,
SERVE_LOG_UNWANTED_ATTRS,
SERVE_LOGGER_NAME,
)
from ray.serve._private.utils import get_component_file_name
Expand All @@ -42,85 +40,74 @@
buildin_print = builtins.print


class ServeJSONFormatter(logging.Formatter):
"""Serve Logging Json Formatter
class ServeComponentFilter(logging.Filter):
"""Serve component filter.

The formatter will generate the json log format on the fly
based on the field of record.
The filter will add the component name, id, and type to the log record.
"""

ADD_IF_EXIST_FIELDS = [
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_APPLICATION,
]

def __init__(
self,
component_name: str,
component_id: str,
component_type: Optional[ServeComponentType] = None,
):
self.component_log_fmt = {
SERVE_LOG_LEVEL_NAME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_LEVEL_NAME],
SERVE_LOG_TIME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_TIME],
}
try:
runtime_context = ray.get_runtime_context()
actor_id = runtime_context.get_actor_id()
if actor_id:
self.component_log_fmt[SERVE_LOG_ACTOR_ID] = actor_id
worker_id = runtime_context.get_worker_id()
if worker_id:
self.component_log_fmt[SERVE_LOG_WORKER_ID] = worker_id
except Exception:
# If get_runtime_context() fails for any reason, do nothing (no adding
# actor_id and/or worker_id to the fmt)
pass

if component_type and component_type == ServeComponentType.REPLICA:
self.component_log_fmt[SERVE_LOG_DEPLOYMENT] = component_name
self.component_log_fmt[SERVE_LOG_REPLICA] = component_id
self.component_log_fmt[SERVE_LOG_COMPONENT] = component_type
self.component_name = component_name
self.component_id = component_id
self.component_type = component_type

def filter(self, record: logging.LogRecord) -> bool:
"""Add component attributes to the log record.

Note: the filter doesn't do any filtering, it only adds the component
attributes.
"""
if self.component_type and self.component_type == ServeComponentType.REPLICA:
setattr(record, SERVE_LOG_DEPLOYMENT, self.component_name)
setattr(record, SERVE_LOG_REPLICA, self.component_id)
setattr(record, SERVE_LOG_COMPONENT, self.component_type)
else:
self.component_log_fmt[SERVE_LOG_COMPONENT] = component_name
self.component_log_fmt[SERVE_LOG_COMPONENT_ID] = component_id
self.message_formatter = logging.Formatter(
SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE]
)
self.asctime_formatter = logging.Formatter("%(asctime)s")
setattr(record, SERVE_LOG_COMPONENT, self.component_name)
setattr(record, SERVE_LOG_REPLICA, self.component_id)

def format(self, record: logging.LogRecord) -> str:
"""Format the log record into json format.
return True

Args:
record: The log record to be formatted.

Returns:
The formatted log record in json format.
"""
record_format = copy.deepcopy(self.component_log_fmt)
record_format[SERVE_LOG_LEVEL_NAME] = record.levelname
record_format[SERVE_LOG_TIME] = self.asctime_formatter.format(record)
class ServeContextFilter(logging.Filter):
"""Serve context filter.

for field in ServeJSONFormatter.ADD_IF_EXIST_FIELDS:
if field in record.__dict__:
record_format[field] = record.__dict__[field]
The filter will add the route, request id, app name to the log record.

record_format[SERVE_LOG_MESSAGE] = self.message_formatter.format(record)
Note: the filter doesn't do any filtering, it only adds the serve request context
attributes.
"""

if SERVE_LOG_EXTRA_FIELDS in record.__dict__:
if not isinstance(record.__dict__[SERVE_LOG_EXTRA_FIELDS], dict):
raise ValueError(
f"Expected a dictionary passing into {SERVE_LOG_EXTRA_FIELDS}, "
f"but got {type(record.__dict__[SERVE_LOG_EXTRA_FIELDS])}"
)
for k, v in record.__dict__[SERVE_LOG_EXTRA_FIELDS].items():
if k in record_format:
raise KeyError(f"Found duplicated key in the log record: {k}")
record_format[k] = v
def filter(self, record):
request_context = ray.serve.context._serve_request_context.get()
if request_context.route:
setattr(record, SERVE_LOG_ROUTE, request_context.route)
if request_context.request_id:
setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
if request_context.app_name:
setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
return True


class ServeLogAttributeRemovalFilter(logging.Filter):
"""Serve log attribute removal filter.

The filter will remove unwanted attributes on the log record so they won't be
included in the structured logs.

return json.dumps(record_format)
Note: the filter doesn't do any filtering, it only removes unwanted attributes.
"""

def filter(self, record):
for key in SERVE_LOG_UNWANTED_ATTRS:
if hasattr(record, key):
delattr(record, key)

return True


class ServeFormatter(logging.Formatter):
Expand Down Expand Up @@ -304,26 +291,12 @@ def configure_component_logger(
logger.setLevel(logging_config.log_level)
logger.handlers.clear()

factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
request_context = ray.serve.context._serve_request_context.get()
record = factory(*args, **kwargs)
if request_context.route:
setattr(record, SERVE_LOG_ROUTE, request_context.route)
if request_context.request_id:
setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
if request_context.app_name:
setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
return record

logging.setLogRecordFactory(record_factory)

# Only add stream handler if RAY_SERVE_LOG_TO_STDERR is True.
if RAY_SERVE_LOG_TO_STDERR:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ServeFormatter(component_name, component_id))
stream_handler.addFilter(log_to_stderr_filter)
stream_handler.addFilter(ServeContextFilter())
logger.addHandler(stream_handler)

if logging_config.logs_dir:
Expand Down Expand Up @@ -355,15 +328,21 @@ def record_factory(*args, **kwargs):
"'LoggingConfig' to enable json format."
)
if RAY_SERVE_ENABLE_JSON_LOGGING or logging_config.encoding == EncodingType.JSON:
file_handler.setFormatter(
ServeJSONFormatter(component_name, component_id, component_type)
file_handler.addFilter(CoreContextFilter())
file_handler.addFilter(ServeContextFilter())
file_handler.addFilter(
ServeComponentFilter(component_name, component_id, component_type)
)
file_handler.setFormatter(JSONFormatter())
else:
file_handler.setFormatter(ServeFormatter(component_name, component_id))

if logging_config.enable_access_log is False:
file_handler.addFilter(log_access_log_filter)

# Remove unwanted attributes from the log record.
file_handler.addFilter(ServeLogAttributeRemovalFilter())

# Redirect print, stdout, and stderr to Serve logger.
if not RAY_SERVE_LOG_TO_STDERR:
builtins.print = redirected_print
Expand Down
Loading
Loading