Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Enable log rotation for Raylet and GCS server #26121

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d4d7811
use .log files for Raylet and GCS server
kfstorm Jun 24, 2022
a24370d
fix
kfstorm Jun 27, 2022
2fe819b
lint
kfstorm Jun 27, 2022
9631e37
update doc
kfstorm Jun 27, 2022
dff90c2
fix test_logging.py
kfstorm Jun 27, 2022
8871688
update
kfstorm Jun 27, 2022
acfcdc2
fix test_ray_log_redirected
kfstorm Jun 28, 2022
78f25e4
fix test_log_cli
kfstorm Jun 28, 2022
c36975d
fix test_log_get
kfstorm Jun 28, 2022
3922b42
fix test_logs_stream_and_tail
kfstorm Jun 28, 2022
940335c
fix test_state_data_source_client
kfstorm Jun 28, 2022
dcb040a
lint
kfstorm Jun 28, 2022
fa4c75b
Print log filename to .out for Raylet and GCS server
kfstorm Jul 7, 2022
e40c1d3
support RAY_LOG_TO_STDERR for Raylet and GCS server
kfstorm Jul 7, 2022
dc2eb40
Merge remote-tracking branch 'upstream/master' into log_rotation
kfstorm Jul 8, 2022
d33a764
fix C++ UT
kfstorm Jul 21, 2022
1318090
Merge remote-tracking branch 'upstream/master' into log_rotation
kfstorm Jul 21, 2022
418295c
fix Windows
kfstorm Jul 21, 2022
c3624c2
Merge remote-tracking branch 'upstream/master' into log_rotation
kfstorm Jul 26, 2022
9225124
revert test case test_log_redirect_to_stderr
kfstorm Aug 29, 2022
977d6c2
Merge remote-tracking branch 'upstream/master' into log_rotation
kfstorm Aug 29, 2022
558d61c
Merge remote-tracking branch 'alipay/log_rotation' into log_rotation
kfstorm Aug 29, 2022
5c719f3
Merge branch 'master' into log_rotation
jovany-wang Nov 24, 2022
0e9ea11
Address comments.
jovany-wang Nov 24, 2022
12c5d60
Fix lint
jovany-wang Nov 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,13 @@ async def _check_parent():
"""Check if raylet is dead and fate-share if it is."""
try:
curr_proc = psutil.Process()
raylet_pid = curr_proc.parent().pid
while True:
parent = curr_proc.parent()
if parent is None or parent.pid == 1 or self.ppid != parent.pid:
log_path = os.path.join(self.log_dir, "raylet.out")
log_path = os.path.join(
self.log_dir, f"raylet_{raylet_pid}.log"
)
error = False
msg = f"Raylet is terminated: ip={self.ip}, id={self.node_id}. "
try:
Expand Down
4 changes: 2 additions & 2 deletions dashboard/modules/log/log_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ def _categorize_log_files(self, log_files: List[str]) -> Dict[str, List[str]]:
result["core_worker"].append(log_file)
elif "core-driver" in log_file and log_file.endswith(".log"):
result["driver"].append(log_file)
elif "raylet." in log_file:
elif "raylet" in log_file:
result["raylet"].append(log_file)
elif "gcs_server." in log_file:
elif "gcs_server" in log_file:
result["gcs_server"].append(log_file)
elif "log_monitor" in log_file:
result["internal"].append(log_file)
Expand Down
6 changes: 3 additions & 3 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"(1) GCS is unexpectedly failed. "
"(2) GCS is overloaded. "
"(3) There's an unexpected network issue. "
"Please check the gcs_server.out log to find the root cause."
"Please check the gcs_server_xxx.log log to find the root cause."
)
NODE_QUERY_FAILURE_WARNING = (
"Failed to query data from {type}. "
Expand Down Expand Up @@ -387,7 +387,7 @@ async def list_tasks(self, *, option: ListApiOptions) -> ListApiResponse:
type="raylet",
total=len(raylet_ids),
network_failures=unresponsive_nodes,
log_command="raylet.out",
log_command="raylet_xxx.log",
)
if unresponsive_nodes == len(raylet_ids):
raise DataSourceUnavailable(warning_msg)
Expand Down Expand Up @@ -469,7 +469,7 @@ async def list_objects(self, *, option: ListApiOptions) -> ListApiResponse:
type="raylet",
total=len(raylet_ids),
network_failures=unresponsive_nodes,
log_command="raylet.out",
log_command="raylet_xxx.log",
)
if unresponsive_nodes == len(raylet_ids):
raise DataSourceUnavailable(warning_msg)
Expand Down
15 changes: 13 additions & 2 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,13 @@ def test_agent_report_unexpected_raylet_death(shutdown_only):
assert "Termination is unexpected." in err.error_message, err.error_message
assert "Raylet logs:" in err.error_message, err.error_message
assert (
os.path.getsize(os.path.join(node.get_session_dir_path(), "logs", "raylet.out"))
os.path.getsize(
os.path.join(
node.get_session_dir_path(),
"logs",
f"raylet_{raylet_proc_info.process.pid}.log",
)
)
< 1 * 1024 ** 2
)

Expand All @@ -250,7 +256,12 @@ def test_agent_report_unexpected_raylet_death_large_file(shutdown_only):

# Append to the Raylet log file with data >> 1 MB.
with open(
os.path.join(node.get_session_dir_path(), "logs", "raylet.out"), "a"
os.path.join(
node.get_session_dir_path(),
"logs",
f"raylet_{raylet_proc_info.process.pid}.log",
),
"a",
) as f:
f.write("test data\n" * 1024 ** 2)

Expand Down
4 changes: 2 additions & 2 deletions doc/source/ray-contribute/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ starting Ray. For example, you can do:
ray start

This will print any ``RAY_LOG(DEBUG)`` lines in the source code to the
``raylet.err`` file, which you can find in :ref:`temp-dir-log-files`.
If it worked, you should see as the first line in ``raylet.err``:
``raylet_xxx.log`` file, which you can find in :ref:`temp-dir-log-files`.
If it worked, you should see as the first line in ``raylet_xxx.log``:

.. code-block:: shell

Expand Down
12 changes: 6 additions & 6 deletions doc/source/ray-core/objects/object-spilling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ usage across multiple physical devices if needed (e.g., SSD devices):
)

.. note::

To optimize the performance, it is recommended to use SSD instead of HDD when using object spilling for memory intensive workloads.

If you are using an HDD, it is recommended that you specify a large buffer size (> 1MB) to reduce IO requests during spilling.
Expand All @@ -58,7 +58,7 @@ If you are using an HDD, it is recommended that you specify a large buffer size
_system_config={
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"type": "filesystem",
"params": {
"directory_path": "/tmp/spill",
"buffer_size": 1_000_000,
Expand Down Expand Up @@ -100,7 +100,7 @@ To enable object spilling to remote storage (any URI supported by `smart_open <h
"min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
"object_spilling_config": json.dumps(
{
"type": "smart_open",
"type": "smart_open",
"params": {
"uri": "s3://bucket/path"
},
Expand All @@ -122,7 +122,7 @@ Spilling to multiple remote storages is also supported.
"min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
"object_spilling_config": json.dumps(
{
"type": "smart_open",
"type": "smart_open",
"params": {
"uri": ["s3://bucket/path1", "s3://bucket/path2, "s3://bucket/path3"],
},
Expand All @@ -139,14 +139,14 @@ Cluster mode
To enable object spilling in multi node clusters:

.. code-block:: bash

# Note that `object_spilling_config`'s value should be json format.
ray start --head --system-config='{"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}'

Stats
-----

When spilling is happening, the following INFO level messages will be printed to the raylet logs (e.g., ``/tmp/ray/session_latest/logs/raylet.out``)::
When spilling is happening, the following INFO level messages will be printed to the raylet logs (e.g., ``/tmp/ray/session_latest/logs/raylet_xxx.log``)::

local_object_manager.cc:166: Spilled 50 MiB, 1 objects, write throughput 230 MiB/s
local_object_manager.cc:334: Restored 50 MiB, 1 objects, read throughput 505 MiB/s
Expand Down
20 changes: 10 additions & 10 deletions doc/source/ray-observability/ray-logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This document will explain Ray's logging system and its best practices.
Driver logs
~~~~~~~~~~~
An entry point of Ray applications that calls ``ray.init()`` is called a driver.
All the driver logs are handled in the same way as normal Python programs.
All the driver logs are handled in the same way as normal Python programs.

Worker logs
~~~~~~~~~~~
Expand All @@ -30,7 +30,7 @@ Let's look at a code example to see how this works.

ray.get(task.remote())

You should be able to see the string `task` from your driver stdout.
You should be able to see the string `task` from your driver stdout.

When logs are printed, the process id (pid) and an IP address of the node that executes tasks/actors are printed together. Check out the output below.

Expand Down Expand Up @@ -72,8 +72,8 @@ This produces the following output:

How to set up loggers
~~~~~~~~~~~~~~~~~~~~~
When using ray, all of the tasks and actors are executed remotely in Ray's worker processes.
Since Python logger module creates a singleton logger per process, loggers should be configured on per task/actor basis.
When using ray, all of the tasks and actors are executed remotely in Ray's worker processes.
Since Python logger module creates a singleton logger per process, loggers should be configured on per task/actor basis.

.. note::

Expand Down Expand Up @@ -137,26 +137,26 @@ Logging directory structure
---------------------------
.. _logging-directory-structure:

By default, Ray logs are stored in a ``/tmp/ray/session_*/logs`` directory.
By default, Ray logs are stored in a ``/tmp/ray/session_*/logs`` directory.

.. note::

The default temp directory is ``/tmp/ray`` (for Linux and Mac OS). If you'd like to change the temp directory, you can specify it when ``ray start`` or ``ray.init()`` is called.
The default temp directory is ``/tmp/ray`` (for Linux and Mac OS). If you'd like to change the temp directory, you can specify it when ``ray start`` or ``ray.init()`` is called.

A new Ray instance creates a new session ID to the temp directory. The latest session ID is symlinked to ``/tmp/ray/session_latest``.

Here's a Ray log directory structure. Note that ``.out`` is logs from stdout/stderr and ``.err`` is logs from stderr. The backward compatibility of log directories is not maintained.

- ``dashboard.log``: A log file of a Ray dashboard.
- ``dashboard_agent.log``: Every Ray node has one dashboard agent. This is a log file of the agent.
- ``gcs_server.[out|err]``: The GCS server is a stateless server that manages Ray cluster metadata. It exists only in the head node.
- ``gcs_server.[out|err|_[pid].log]``: The GCS server is a stateless server that manages Ray cluster metadata. It exists only in the head node.
- ``log_monitor.log``: The log monitor is in charge of streaming logs to the driver.
- ``monitor.log``: Ray's cluster launcher is operated with a monitor process. It also manages the autoscaler.
- ``monitor.[out|err]``: Stdout and stderr of a cluster launcher.
- ``plasma_store.[out|err]``: Deprecated.
- ``python-core-driver-[worker_id]_[pid].log``: Ray drivers consist of CPP core and Python/Java frontend. This is a log file generated from CPP code.
- ``python-core-worker-[worker_id]_[pid].log``: Ray workers consist of CPP core and Python/Java frontend. This is a log file generated from CPP code.
- ``raylet.[out|err]``: A log file of raylets.
- ``raylet.[out|err|_[pid].log]``: A log file of raylets.
- ``redis-shard_[shard_index].[out|err]``: Redis shard log files.
- ``redis.[out|err]``: Redis log files.
- ``worker-[worker_id]-[job_id]-[pid].[out|err]``: Python/Java part of Ray drivers and workers. All of stdout and stderr from tasks/actors are streamed here. Note that job_id is an id of the driver.
Expand All @@ -166,9 +166,9 @@ Here's a Ray log directory structure. Note that ``.out`` is logs from stdout/std

Log rotation
------------
Ray supports log rotation of log files. Note that not all components are currently supporting log rotation. (Raylet and Python/Java worker logs are not rotating).
Ray supports log rotation of log files. Note that not all components are currently supporting log rotation. (Python worker logs are not rotating).

By default, logs are rotating when it reaches to 512MB (maxBytes), and there could be up to 5 backup files (backupCount). Indexes are appended to all backup files (e.g., `raylet.out.1`)
By default, logs are rotating when it reaches to 512MB (maxBytes), and there could be up to 10 backup files (backupCount). Indexes are appended to all backup files (e.g., `raylet_xxx.log.1`)
If you'd like to change the log rotation configuration, you can do it by specifying environment variables. For example,

.. code-block:: bash
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ def _pid_alive(pid):
return alive


def get_pid(name):
pids = psutil.process_iter()
for pid in pids:
if name in pid.name():
return pid.pid

return -1


def check_call_module(main, argv, capture_stdout=False, capture_stderr=False):
# We use this function instead of calling the "ray" command to work around
# some deadlocks that occur when piping ray's output on Windows
Expand Down
4 changes: 3 additions & 1 deletion python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ class LocalRayletDiedError(RayError):
"""Indicates that the task's local raylet died."""

def __str__(self):
return "The task's local raylet died. Check raylet.out for more information."
return (
"The task's local raylet died. Check raylet_xxx.log for more information."
)


@PublicAPI
Expand Down
17 changes: 4 additions & 13 deletions python/ray/tests/test_kill_raylet_signal_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,15 @@
import pytest

import ray
from ray._private.test_utils import wait_for_condition


def get_pid(name):
pids = psutil.process_iter()
for pid in pids:
if name in pid.name():
return pid.pid

return -1
from ray._private.test_utils import wait_for_condition, get_pid


def check_result(filename, num_signal, check_key):
ray.init(num_cpus=1)
session_dir = ray._private.worker._global_node.get_session_dir_path()
raylet_out_path = filename.format(session_dir)
pid = get_pid("raylet")
assert pid > 0
raylet_out_path = filename.format(session_dir, pid)
p = psutil.Process(pid)
p.send_signal(num_signal)
p.wait(timeout=15)
Expand All @@ -38,13 +29,13 @@ def check_file():

@pytest.mark.skipif(sys.platform == "win32", reason="Not support on Windows.")
def test_kill_raylet_signal_log(shutdown_only):
check_result("{}/logs/raylet.err", signal.SIGABRT, "SIGABRT")
check_result("{}/logs/raylet_{}.log", signal.SIGABRT, "SIGABRT")


@pytest.mark.skipif(sys.platform != "win32", reason="Only run on Windows.")
@pytest.mark.skip(reason="Flaky on Windows")
def test_kill_raylet_signal_log_win(shutdown_only):
check_result("{}/logs/raylet.out", signal.CTRL_BREAK_EVENT, "SIGTERM")
check_result("{}/logs/raylet_{}.log", signal.CTRL_BREAK_EVENT, "SIGTERM")


if __name__ == "__main__":
Expand Down
20 changes: 11 additions & 9 deletions python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ def test_log_rotation(shutdown_only, monkeypatch):
ray_constants.PROCESS_TYPE_MONITOR,
ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER_DRIVER,
ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER,
ray_constants.PROCESS_TYPE_RAYLET,
ray_constants.PROCESS_TYPE_GCS_SERVER,
# Below components are not log rotating now.
# ray_constants.PROCESS_TYPE_RAYLET,
# ray_constants.PROCESS_TYPE_GCS_SERVER,
# ray_constants.PROCESS_TYPE_WORKER,
]

Expand Down Expand Up @@ -200,14 +200,16 @@ def is_event_loop_stats_found(path):
found = True
return found

for path in paths:
components_to_check = [
ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER_DRIVER,
ray_constants.PROCESS_TYPE_RAYLET,
ray_constants.PROCESS_TYPE_GCS_SERVER,
]

for component in components_to_check:
# Need to remove suffix to avoid reading log rotated files.
if "python-core-driver" in str(path):
wait_for_condition(lambda: is_event_loop_stats_found(path))
if "raylet.out" in str(path):
wait_for_condition(lambda: is_event_loop_stats_found(path))
if "gcs_server.out" in str(path):
wait_for_condition(lambda: is_event_loop_stats_found(path))
path = [p for p in paths if component in p.name and p.name.endswith(".log")][0]
wait_for_condition(lambda: is_event_loop_stats_found(path))


def test_worker_id_names(shutdown_only):
Expand Down
10 changes: 4 additions & 6 deletions python/ray/tests/test_raylet_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def enable_export_loglevel(func):
# For running in both python and pytest, this decorator makes sure
# log level env parameter will be changed.
# Make raylet emit a log to raylet.err.
# Make raylet emit a log to raylet_xxx.log.
os.environ["RAY_BACKEND_LOG_LEVEL"] = "info"
return func

Expand All @@ -20,8 +20,8 @@ def enable_export_loglevel(func):
def test_ray_log_redirected(ray_start_regular):
session_dir = ray._private.worker._global_node.get_session_dir_path()
assert os.path.exists(session_dir), "Session dir not found."
raylet_out_path = "{}/logs/raylet.out".format(session_dir)
raylet_err_path = "{}/logs/raylet.err".format(session_dir)
raylet_logs = glob.glob("{}/logs/raylet*".format(session_dir))
assert len(raylet_logs) == 3 # .out, .err, .log

@ray.remote
class Actor:
Expand All @@ -38,9 +38,7 @@ def file_exists_and_not_empty(filename):
remote_pid = ray.get(actor.get_pid.remote())
local_pid = os.getpid()

wait_for_condition(
lambda: all(map(file_exists_and_not_empty, [raylet_out_path, raylet_err_path]))
)
wait_for_condition(lambda: all(map(file_exists_and_not_empty, raylet_logs)))

core_worker_logs = glob.glob(
"{}/logs/python-core-worker*{}.log".format(session_dir, remote_pid)
Expand Down
10 changes: 7 additions & 3 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import json
import sys
import re
from dataclasses import dataclass
from typing import List, Tuple
from unittest.mock import MagicMock
Expand Down Expand Up @@ -769,7 +770,7 @@ async def test_api_manager_list_tasks(state_api_manager):
warning = result.partial_failure_warning
assert (
NODE_QUERY_FAILURE_WARNING.format(
type="raylet", total=2, network_failures=1, log_command="raylet.out"
type="raylet", total=2, network_failures=1, log_command="raylet_xxx.log"
)
in warning
)
Expand Down Expand Up @@ -879,7 +880,7 @@ async def test_api_manager_list_objects(state_api_manager):
warning = result.partial_failure_warning
assert (
NODE_QUERY_FAILURE_WARNING.format(
type="raylet", total=2, network_failures=1, log_command="raylet.out"
type="raylet", total=2, network_failures=1, log_command="raylet_xxx.log"
)
in warning
)
Expand Down Expand Up @@ -1212,7 +1213,10 @@ def get_port():
result = await client.list_logs(node_id, timeout=30, glob_filter="*")
assert isinstance(result, ListLogsReply)

stream = await client.stream_log(node_id, "raylet.out", False, 10, 1, 5)
raylet_log_filename = [
f for f in result.log_files if re.search(r"raylet_\d+\.log", f)
][0]
stream = await client.stream_log(node_id, raylet_log_filename, False, 10, 1, 5)
async for logs in stream:
log_lines = len(logs.data.decode().split("\n"))
assert isinstance(logs, StreamLogReply)
Expand Down
Loading