Skip to content

Commit e0c65e7

Browse files
Clean up random name generation.
1 parent 1f0582d commit e0c65e7

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

lib/python/ray/services.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ def address(host, port):
2828
def new_port():
2929
return random.randint(10000, 65535)
3030

31+
def random_name():
32+
return str(random.randint(0, 99999999))
33+
3134
def cleanup():
3235
"""When running in local mode, shutdown the Ray processes.
3336
@@ -65,7 +68,7 @@ def start_redis(port):
6568

6669
def start_local_scheduler(redis_address, plasma_store_name):
6770
local_scheduler_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../photon/build/photon_scheduler")
68-
local_scheduler_name = "/tmp/scheduler{}".format(random.randint(0, 10000))
71+
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
6972
p = subprocess.Popen([local_scheduler_filepath, "-s", local_scheduler_name, "-r", redis_address, "-p", plasma_store_name])
7073
if cleanup:
7174
all_processes.append(p)
@@ -81,12 +84,12 @@ def start_objstore(node_ip_address, redis_address, cleanup):
8184
that imported services exits.
8285
"""
8386
plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store")
84-
store_name = "/tmp/ray_plasma_store{}".format(random.randint(0, 10000))
87+
store_name = "/tmp/ray_plasma_store{}".format(random_name())
8588
p1 = subprocess.Popen([plasma_store_executable, "-s", store_name])
8689

8790
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_manager")
88-
manager_name = "/tmp/ray_plasma_manager{}".format(random.randint(0, 10000))
89-
manager_port = random.randint(10000, 50000)
91+
manager_name = "/tmp/ray_plasma_manager{}".format(random_name())
92+
manager_port = new_port()
9093
p2 = subprocess.Popen([plasma_manager_executable,
9194
"-s", store_name,
9295
"-m", manager_name,

lib/python/ray/worker.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
def random_object_id():
3333
return photon.ObjectID("".join([chr(random.randint(0, 255)) for _ in range(20)]))
3434

35+
def random_string():
36+
return "".join([chr(random.randint(0, 255)) for _ in range(20)])
37+
3538
class FunctionID(object):
3639
def __init__(self, function_id):
3740
self.function_id = function_id
@@ -516,7 +519,7 @@ def run_function_on_all_workers(self, function):
516519
if self.mode is None:
517520
self.cached_functions_to_run.append(function)
518521
else:
519-
function_to_run_id = np.random.randint(0, 1000)
522+
function_to_run_id = random_string()
520523
key = "FunctionsToRun:{}".format(function_to_run_id)
521524
self.redis_client.hmset(key, {"function_id": function_to_run_id,
522525
"function": pickling.dumps(function)})
@@ -747,7 +750,7 @@ def fetch_and_process_reusable_variable(key, worker=global_worker):
747750
# record the traceback and notify the scheduler of the failure.
748751
traceback_str = format_error_message(traceback.format_exc())
749752
# Log the error message.
750-
error_key = "ReusableVariableImportError:{}".format("".join([chr(random.randint(0, 255)) for _ in range(20)]))
753+
error_key = "ReusableVariableImportError:{}".format(random_string())
751754
worker.redis_client.hmset(error_key, {"name": reusable_variable_name,
752755
"message": traceback_str})
753756
worker.redis_client.rpush("ErrorKeys", error_key)
@@ -766,7 +769,7 @@ def fetch_and_process_function_to_run(key, worker=global_worker):
766769
traceback_str = traceback.format_exc()
767770
# Log the error message.
768771
name = function.__name__ if "function" in locals() and hasattr(function, "__name__") else ""
769-
error_key = "FunctionToRunError:{}".format("".join([chr(random.randint(0, 255)) for _ in range(20)]))
772+
error_key = "FunctionToRunError:{}".format(random_string())
770773
worker.redis_client.hmset(error_key, {"name": name,
771774
"message": traceback_str})
772775
worker.redis_client.rpush("ErrorKeys", error_key)
@@ -832,7 +835,7 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker):
832835
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
833836
and SILENT_MODE.
834837
"""
835-
worker.worker_id = np.random.randint(0, 1000000)
838+
worker.worker_id = random_string()
836839
worker.connected = True
837840
worker.set_mode(mode)
838841
# If running Ray in PYTHON_MODE, there is no need to create call create_worker
@@ -1069,7 +1072,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo
10691072
failure_objects = [failure_object for _ in range(len(return_object_ids))]
10701073
store_outputs_in_objstore(return_object_ids, failure_objects, worker)
10711074
# Log the error message.
1072-
error_key = "TaskError:{}".format("".join([chr(random.randint(0, 255)) for _ in range(20)]))
1075+
error_key = "TaskError:{}".format(random_string())
10731076
worker.redis_client.hmset(error_key, {"function_id": function_id.id(),
10741077
"function_name": function_name,
10751078
"message": traceback_str})
@@ -1082,7 +1085,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo
10821085
# The attempt to reinitialize the reusable variables threw an exception.
10831086
# We record the traceback and notify the scheduler.
10841087
traceback_str = format_error_message(traceback.format_exc())
1085-
error_key = "ReusableVariableReinitializeError:{}".format("".join([chr(random.randint(0, 255)) for _ in range(20)]))
1088+
error_key = "ReusableVariableReinitializeError:{}".format(random_string())
10861089
worker.redis_client.hmset(error_key, {"task_instance_id": "NOTIMPLEMENTED",
10871090
"task_id": "NOTIMPLEMENTED",
10881091
"function_id": function_id.id(),

0 commit comments

Comments
 (0)