Skip to content

Commit

Permalink
Revert "Parse ray runtime parameters from GlobalStateAccessor (#214)" (
Browse files Browse the repository at this point in the history
…#228)

This reverts commit caddc0e.

Co-authored-by: Dave Kleinschmidt <dave.f.kleinschmidt@gmail.com>
  • Loading branch information
glennmoy and kleinschmidt authored Nov 3, 2023
1 parent 6e11dec commit 0b75c77
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 118 deletions.
12 changes: 1 addition & 11 deletions build/wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ namespace jlcxx
template<> struct SuperType<rpc::Address> { typedef google::protobuf::Message type; };
template<> struct SuperType<rpc::JobConfig> { typedef google::protobuf::Message type; };
template<> struct SuperType<rpc::ObjectReference> { typedef google::protobuf::Message type; };
template<> struct SuperType<rpc::GcsNodeInfo> { typedef google::protobuf::Message type; };
template<> struct SuperType<TaskArgByReference> { typedef TaskArg type; };
template<> struct SuperType<TaskArgByValue> { typedef TaskArg type; };

Expand Down Expand Up @@ -700,14 +699,6 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
mod.add_type<rpc::ObjectReference>("ObjectReference", jlcxx::julia_base_type<google::protobuf::Message>());
jlcxx::stl::apply_stl<rpc::ObjectReference>(mod);

// message GcsNodeInfo
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/protobuf/gcs.proto#L286
mod.add_type<rpc::GcsNodeInfo>("GcsNodeInfo", jlcxx::julia_base_type<google::protobuf::Message>())
.constructor<>()
.method("raylet_socket_name", &rpc::GcsNodeInfo::raylet_socket_name)
.method("object_store_socket_name", &rpc::GcsNodeInfo::object_store_socket_name)
.method("node_manager_port", &rpc::GcsNodeInfo::node_manager_port);

// class RayObject
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/ray_object.h#L28
mod.add_type<RayObject>("RayObject")
Expand Down Expand Up @@ -759,8 +750,7 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod)
.constructor<const gcs::GcsClientOptions&>()
.method("GetNextJobID", &ray::gcs::GlobalStateAccessor::GetNextJobID)
.method("Connect", &ray::gcs::GlobalStateAccessor::Connect)
.method("Disconnect", &ray::gcs::GlobalStateAccessor::Disconnect)
.method("GetNodeToConnectForDriver", &ray::gcs::GlobalStateAccessor::GetNodeToConnectForDriver);
.method("Disconnect", &ray::gcs::GlobalStateAccessor::Disconnect);

mod.method("report_error", &report_error);

Expand Down
1 change: 0 additions & 1 deletion build/wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "ray/core_worker/common.h"
#include "ray/core_worker/core_worker.h"
#include "src/ray/protobuf/common.pb.h"
#include "src/ray/protobuf/gcs.pb.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/gcs/gcs_client/global_state_accessor.h"
#include "ray/common/asio/instrumented_io_context.h"
Expand Down
1 change: 0 additions & 1 deletion src/Ray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export RayError, RaySystemError, RayTaskError
include(joinpath("ray_julia_jll", "ray_julia_jll.jl"))
using .ray_julia_jll: ray_julia_jll, ray_julia_jll as ray_jll

include("constants.jl")
include("exceptions.jl")
include("function_manager.jl")
include("runtime_env.jl")
Expand Down
46 changes: 0 additions & 46 deletions src/constants.jl

This file was deleted.

35 changes: 34 additions & 1 deletion src/function_manager.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
_mib_string(num_bytes) = string(div(num_bytes, 1024 * 1024), " MiB")
# NOTES:
#
# python function manager maintains a local table of "execution info" with a
# "function_id" key and values that are named tuples of name/function/max_calls.
#
# python remote function sets a UUID4 at construction time:
# https://github.com/beacon-biosignals/ray/blob/7ad1f47a9c849abf00ca3e8afc7c3c6ee54cda43/python/ray/remote_function.py#L128
#
# ...that's used to set the function_hash (???)...
# https://github.com/beacon-biosignals/ray/blob/7ad1f47a9c849abf00ca3e8afc7c3c6ee54cda43/python/ray/remote_function.py#L263-L265
#
# later comment suggests that "ideally" they'd use the hash of the pickled
# function:
# https://github.com/beacon-biosignals/ray/blob/7ad1f47a9c849abf00ca3e8afc7c3c6ee54cda43/python/ray/includes/function_descriptor.pxi#L183-L186
#
# ...but that it's not stable for some reason. but.....neither is a random
# UUID?????
#
# the function table key is built like
# <key type>:<jobid>:key

# function manager holds:
# local cache of functions (keyed by function id/hash from descriptor)
# gcs client
# ~~maybe job id?~~ this is managed by the core worker process

# https://github.com/beacon-biosignals/ray/blob/1c0cddc478fa33d4c244d3c30aba861a77b0def9/python/ray/_private/ray_constants.py#L122-L123
const FUNCTION_SIZE_WARN_THRESHOLD = 10_000_000 # in bytes
const FUNCTION_SIZE_ERROR_THRESHOLD = 100_000_000 # in bytes

_mib_string(num_bytes) = string(div(num_bytes, 1024 * 1024), " MiB")
# https://github.com/beacon-biosignals/ray/blob/1c0cddc478fa33d4c244d3c30aba861a77b0def9/python/ray/_private/utils.py#L744-L746
const _check_msg = "Check that its definition is not implicitly capturing a large " *
"array or other object in scope. Tip: use `Ray.put()` to put large " *
Expand All @@ -22,6 +51,10 @@ function check_oversized_function(serialized, function_descriptor)
return nothing
end

# python uses "fun" for the namespace: https://github.com/beacon-biosignals/ray/blob/7ad1f47a9c849abf00ca3e8afc7c3c6ee54cda43/python/ray/_private/ray_constants.py#L380
# so "jlfun" seems reasonable
const FUNCTION_MANAGER_NAMESPACE = "jlfun"

Base.@kwdef struct FunctionManager
gcs_client::ray_jll.JuliaGcsClient
functions::Dict{String,Any}
Expand Down
131 changes: 98 additions & 33 deletions src/runtime.jl
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
"""
const GLOBAL_STATE_ACCESSOR::Ref{ray_jll.GlobalStateAccessor}

Global binding for GCS client interface to access global state information.
This is set during [`Ray.init`](@ref) and used there to get the the raylet name, object
store name, node manager port, and the next job IDJob ID for the driver.
"""
const GLOBAL_STATE_ACCESSOR = Ref{ray_jll.GlobalStateAccessor}()

const JOB_RUNTIME_ENV = Ref{RuntimeEnv}()

macro ray_import(ex)
Expand All @@ -29,6 +20,21 @@ function _ray_import(runtime_env::RuntimeEnv)
return nothing
end

"""
const GLOBAL_STATE_ACCESSOR::Ref{ray_jll.GlobalStateAccessor}

Global binding for GCS client interface to access global state information.
Currently only used to get the next job ID.

This is set during `init` and used there to get the Job ID for the driver.
"""
const GLOBAL_STATE_ACCESSOR = Ref{ray_jll.GlobalStateAccessor}()

# env var to control whether logs are sent do stderr or to file. if "1", sent
# to stderr; otherwise, will be sent to files in `/tmp/ray/session_latest/logs/`
# https://github.com/beacon-biosignals/ray/blob/4ceb62daaad05124713ff9d94ffbdad35ee19f86/python/ray/_private/ray_constants.py#L198
const LOGGING_REDIRECT_STDERR_ENVIRONMENT_VARIABLE = "RAY_LOG_TO_STDERR"

function default_log_dir(session_dir)
redirect_logs = Base.get(ENV, LOGGING_REDIRECT_STDERR_ENVIRONMENT_VARIABLE, "0") == "1"
# realpath() resolves relative paths and symlinks, including the default
Expand All @@ -39,7 +45,7 @@ function default_log_dir(session_dir)
end

function init(runtime_env::Union{RuntimeEnv,Nothing}=nothing;
session_dir=DEFAULT_SESSION_DIR,
session_dir="/tmp/ray/session_latest",
logs_dir=default_log_dir(session_dir))
# XXX: this is at best EXREMELY IMPERFECT check. we should do something
# more like what hte python Worker class does, getting node ID at
Expand All @@ -61,7 +67,16 @@ function init(runtime_env::Union{RuntimeEnv,Nothing}=nothing;
runtime_env = JOB_RUNTIME_ENV[]
end

gcs_address = read(GCS_ADDRESS_FILE, String) # host:port (e.g. "127.0.0.1:6379")
# TODO: use something like the java config bootstrap address (?) to get this
# https://github.com/beacon-biosignals/Ray.jl/issues/52
# information instead of parsing logs? I can't quite tell where it's coming
# from (set from a `ray.address` config option):
# https://github.com/beacon-biosignals/ray/blob/7ad1f47a9c849abf00ca3e8afc7c3c6ee54cda43/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java#L165-L171

# we use session_dir here instead of logs_dir since logs_dir can be set to
# "" to disable file logging without using env var
args = parse_ray_args_from_raylet_out(session_dir)
gcs_address = args[3]

opts = ray_jll.GcsClientOptions(gcs_address)
GLOBAL_STATE_ACCESSOR[] = ray_jll.GlobalStateAccessor(opts)
Expand All @@ -84,19 +99,7 @@ function init(runtime_env::Union{RuntimeEnv,Nothing}=nothing;
job_config = JobConfig(RuntimeEnvInfo(runtime_env), metadata)
serialized_job_config = _serialize(job_config)

raylet, store, node_port = get_node_to_connect_for_driver(GLOBAL_STATE_ACCESSOR[],
NODE_IP_ADDRESS)

# TODO: downgrade to debug
# https://github.com/beacon-biosignals/Ray.jl/issues/53
@info begin
"Raylet socket: $raylet, Object store: $store, Node IP: $NODE_IP_ADDRESS, " *
"Node port: $node_port, GCS Address: $gcs_address, JobID: $job_id"
end

ray_jll.initialize_driver(raylet, store, gcs_address, NODE_IP_ADDRESS, node_port,
job_id, logs_dir, serialized_job_config)

ray_jll.initialize_driver(args..., job_id, logs_dir, serialized_job_config)
atexit(ray_jll.shutdown_driver)

_init_global_function_manager(gcs_address)
Expand All @@ -122,17 +125,79 @@ Get the current task ID for this worker in hex format.
"""
get_task_id() = String(ray_jll.Hex(ray_jll.GetCurrentTaskId(ray_jll.GetCoreWorker())))

function get_node_to_connect_for_driver(global_state_accessor, node_ip_address)
node_to_connect = StdString()
status = ray_jll.GetNodeToConnectForDriver(global_state_accessor, node_ip_address,
CxxPtr(node_to_connect))
node_info = ray_jll.ParseFromString(ray_jll.GcsNodeInfo, node_to_connect)
function parse_ray_args_from_raylet_out(session_dir)
#=
"Starting agent process with command: ... \
--node-ip-address=127.0.0.1 --metrics-export-port=60404 --dashboard-agent-port=60493 \
--listen-port=52365 --node-manager-port=58888 \
--object-store-name=/tmp/ray/session_2023-08-14_14-54-36_055139_41385/sockets/plasma_store \
--raylet-name=/tmp/ray/session_2023-08-14_14-54-36_055139_41385/sockets/raylet \
--temp-dir=/tmp/ray --session-dir=/tmp/ray/session_2023-08-14_14-54-36_055139_41385 \
--runtime-env-dir=/tmp/ray/session_2023-08-14_14-54-36_055139_41385/runtime_resources \
--log-dir=/tmp/ray/session_2023-08-14_14-54-36_055139_41385/logs \
--logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 \
--session-name=session_2023-08-14_14-54-36_055139_41385 \
--gcs-address=127.0.0.1:6379 --minimal --agent-id 470211272"
=#
line = open(joinpath(session_dir, "logs", "raylet.out")) do io
while !eof(io)
line = readline(io)
if contains(line, "Starting agent process")
return line
end
end
end

line !== nothing || error("Unable to locate agent process information")

# --raylet-name=/tmp/ray/session_2023-08-14_18-52-23_003681_54068/sockets/raylet
raylet_match = match(r"raylet-name=((\/[a-z,0-9,_,-]+)+)", line)
raylet = if raylet_match !== nothing
String(raylet_match[1])
else
error("Unable to find Raylet socket")
end

# --object-store-name=/tmp/ray/session_2023-08-14_18-52-23_003681_54068/sockets/plasma_store
store_match = match(r"object-store-name=((\/[a-z,0-9,_,-]+)+)", line)
store = if store_match !== nothing
String(store_match[1])
else
error("Unable to find Object Store socket")
end

raylet_socket_name = ray_jll.raylet_socket_name(node_info)[]::StdString
store_socket_name = ray_jll.object_store_socket_name(node_info)[]::StdString
node_manager_port = ray_jll.node_manager_port(node_info)::Integer
# --gcs-address=127.0.0.1:6379
gcs_match = match(r"gcs-address=(([0-9]{1,3}\.){3}[0-9]{1,3}:[0-9]{1,5})", line)
gcs_address = if gcs_match !== nothing
String(gcs_match[1])
else
error("Unable to find GCS address")
end

# --node-ip-address=127.0.0.1
node_ip_match = match(r"node-ip-address=(([0-9]{1,3}\.){3}[0-9]{1,3})", line)
node_ip = if node_ip_match !== nothing
String(node_ip_match[1])
else
error("Unable to find Node IP address")
end

# --node-manager-port=63639
port_match = match(r"node-manager-port=([0-9]{1,5})", line)
node_port = if port_match !== nothing
parse(Int, port_match[1])
else
error("Unable to find Node Manager port")
end

# TODO: downgrade to debug
# https://github.com/beacon-biosignals/Ray.jl/issues/53
@info begin
"Raylet socket: $raylet, Object store: $store, Node IP: $node_ip, " *
"Node port: $node_port, GCS Address: $gcs_address"
end

return (raylet_socket_name, store_socket_name, node_manager_port)
return (raylet, store, gcs_address, node_ip, node_port)
end

initialize_coreworker_driver(args...) = ray_jll.initialize_coreworker_driver(args...)
Expand Down
24 changes: 0 additions & 24 deletions test/global_state_accessor.jl

This file was deleted.

1 change: 0 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ include("utils.jl")

setup_ray_head_node() do
include("function_manager.jl")
include("global_state_accessor.jl")
setup_core_worker() do
include("object_ref.jl")
include("ray_serializer.jl")
Expand Down

0 comments on commit 0b75c77

Please sign in to comment.