Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
object_store_memory=object_store_memory,
num_workers=num_workers,
cleanup=False,
redirect_worker_output=True,
redirect_output=True,
resources=resources,
num_redis_shards=num_redis_shards,
Expand Down Expand Up @@ -222,6 +223,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
num_workers=num_workers,
object_store_memory=object_store_memory,
cleanup=False,
redirect_worker_output=True,
redirect_output=True,
resources=resources,
plasma_directory=plasma_directory,
Expand Down
35 changes: 26 additions & 9 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ def start_ray_processes(address_info=None,
redis_max_clients=None,
worker_path=None,
cleanup=True,
redirect_worker_output=False,
redirect_output=False,
include_global_scheduler=False,
include_log_monitor=False,
Expand Down Expand Up @@ -1090,8 +1091,10 @@ def start_ray_processes(address_info=None,
cleanup (bool): If cleanup is true, then the processes started here
will be killed by services.cleanup() when the Python process that
called this method exits.
redirect_output (bool): True if stdout and stderr should be redirected
to a file.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
include_global_scheduler (bool): If include_global_scheduler is True,
then start a global scheduler process.
include_log_monitor (bool): If True, then start a log monitor to
Expand All @@ -1114,6 +1117,8 @@ def start_ray_processes(address_info=None,
A dictionary of the address information for the processes that were
started.
"""
print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.")

if resources is None:
resources = {}
if not isinstance(resources, list):
Expand Down Expand Up @@ -1149,7 +1154,8 @@ def start_ray_processes(address_info=None,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
redirect_output=True,
redirect_worker_output=redirect_output, cleanup=cleanup)
redirect_worker_output=redirect_worker_output,
cleanup=cleanup)
address_info["redis_address"] = redis_address
if "RAY_USE_NEW_GCS" in os.environ:
credis_address = start_credis(
Expand Down Expand Up @@ -1247,9 +1253,12 @@ def start_ray_processes(address_info=None,
# If we're starting the workers from Python, the local scheduler
# should not start any workers.
num_local_scheduler_workers = 0
# Start the local scheduler.
# Start the local scheduler. Note that if we do not wish to redirect
# the worker output, then we cannot redirect the local scheduler
# output.
local_scheduler_stdout_file, local_scheduler_stderr_file = (
new_log_files("local_scheduler_{}".format(i), redirect_output))
new_log_files("local_scheduler_{}".format(i),
redirect_output=redirect_worker_output))
local_scheduler_name = start_local_scheduler(
redis_address,
node_ip_address,
Expand Down Expand Up @@ -1314,6 +1323,7 @@ def start_ray_node(node_ip_address,
object_store_memory=None,
worker_path=None,
cleanup=True,
redirect_worker_output=False,
redirect_output=False,
resources=None,
plasma_directory=None,
Expand All @@ -1340,8 +1350,10 @@ def start_ray_node(node_ip_address,
cleanup (bool): If cleanup is true, then the processes started here
will be killed by services.cleanup() when the Python process that
called this method exits.
redirect_output (bool): True if stdout and stderr should be redirected
to a file.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
resources: A dictionary mapping resource name to the available quantity
of that resource.
plasma_directory: A directory where the Plasma memory mapped files will
Expand All @@ -1363,6 +1375,7 @@ def start_ray_node(node_ip_address,
worker_path=worker_path,
include_log_monitor=True,
cleanup=cleanup,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output,
resources=resources,
plasma_directory=plasma_directory,
Expand All @@ -1378,6 +1391,7 @@ def start_ray_head(address_info=None,
object_store_memory=None,
worker_path=None,
cleanup=True,
redirect_worker_output=False,
redirect_output=False,
start_workers_from_local_scheduler=True,
resources=None,
Expand Down Expand Up @@ -1414,8 +1428,10 @@ def start_ray_head(address_info=None,
cleanup (bool): If cleanup is true, then the processes started here
will be killed by services.cleanup() when the Python process that
called this method exits.
redirect_output (bool): True if stdout and stderr should be redirected
to a file.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
start_workers_from_local_scheduler (bool): If this flag is True, then
start the initial workers from the local scheduler. Else, start
them from Python.
Expand Down Expand Up @@ -1447,6 +1463,7 @@ def start_ray_head(address_info=None,
object_store_memory=object_store_memory,
worker_path=worker_path,
cleanup=cleanup,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output,
include_global_scheduler=True,
include_log_monitor=True,
Expand Down
20 changes: 14 additions & 6 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,8 @@ def _init(address_info=None,
num_local_schedulers=None,
object_store_memory=None,
driver_mode=SCRIPT_MODE,
redirect_output=False,
redirect_worker_output=False,
redirect_output=True,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None,
Expand Down Expand Up @@ -1304,8 +1305,10 @@ def _init(address_info=None,
object store with.
driver_mode (bool): The mode in which to start the driver. This should
be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
redirect_output (bool): True if stdout and stderr for all the processes
should be redirected to files and false otherwise.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
start_workers_from_local_scheduler (bool): If this flag is True, then
start the initial workers from the local scheduler. Else, start
them from Python. The latter case is for debugging purposes only.
Expand Down Expand Up @@ -1385,6 +1388,7 @@ def _init(address_info=None,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
object_store_memory=object_store_memory,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output,
start_workers_from_local_scheduler=(
start_workers_from_local_scheduler),
Expand Down Expand Up @@ -1455,7 +1459,8 @@ def _init(address_info=None,


def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False,
num_workers=None, driver_mode=SCRIPT_MODE,
redirect_worker_output=False, redirect_output=True,
num_cpus=None, num_gpus=None, resources=None,
num_custom_resource=None, num_redis_shards=None,
redis_max_clients=None, plasma_directory=None,
Expand All @@ -1481,8 +1486,10 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
provided if redis_address is not provided.
driver_mode (bool): The mode in which to start the driver. This should
be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
redirect_output (bool): True if stdout and stderr for all the processes
should be redirected to files and false otherwise.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
num_cpus (int): Number of cpus the user wishes all local schedulers to
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
Expand Down Expand Up @@ -1519,6 +1526,7 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
"redis_address": redis_address}
return _init(address_info=info, start_ray_local=(redis_address is None),
num_workers=num_workers, driver_mode=driver_mode,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output, num_cpus=num_cpus,
num_gpus=num_gpus, resources=resources,
num_redis_shards=num_redis_shards,
Expand Down
2 changes: 1 addition & 1 deletion src/local_scheduler/local_scheduler_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
* scheduler gives this client a task. */
read_message(conn->conn, &type, &reply_size, &reply);
if (type == DISCONNECT_CLIENT) {
RAY_LOG(WARNING) << "Exiting because local scheduler closed connection.";
RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection.";
exit(1);
}
RAY_CHECK(type == MessageType_ExecuteTask);
Expand Down
4 changes: 2 additions & 2 deletions test/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ def wait_for_object_table():
ray.global_state.object_table(result_id))

def testLogFileAPI(self):
ray.init(redirect_output=True)
ray.init(redirect_worker_output=True)

message = "unique message"

Expand Down Expand Up @@ -1993,7 +1993,7 @@ def f():
def testWorkers(self):
num_workers = 3
ray.init(
redirect_output=True,
redirect_worker_output=True,
num_cpus=num_workers,
num_workers=num_workers)

Expand Down