Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][core] GcsClient async binding, aka remove PythonGcsClient. #45289

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
435d87d
POC Gcs Client async binding
rynewang May 13, 2024
d853fa6
add
rynewang May 13, 2024
52c0bb2
internal_kv_get and internal_kv_put and type converters
rynewang May 14, 2024
4a82a95
add test
rynewang May 14, 2024
2ff561a
split out to a .pxi file
rynewang May 16, 2024
c2fd255
cosmetic
rynewang May 20, 2024
9958ef4
backport postprocess callbacks
rynewang May 20, 2024
900c984
add internal kv APIs sync and async to MyGcsClient
rynewang May 20, 2024
022ce3a
update
rynewang May 21, 2024
6792366
internal kv ok
rynewang May 23, 2024
b9433e5
add @property address
rynewang May 23, 2024
d2ed905
Create a singleton io context and thread, and place standalone gcs cl…
rynewang May 23, 2024
349642f
cluster_id and ctors
rynewang May 23, 2024
1f79626
check alive and replace gcs aio client
rynewang May 23, 2024
9438b09
Initial POC to replace PythonGcsClient (on internal kv)
rynewang May 23, 2024
ff06b24
add timeout to get_all_job_info
rynewang May 28, 2024
17f5e7d
lint
rynewang May 28, 2024
1ecf342
fix proto parsing
rynewang May 28, 2024
c72f409
fix address format error
rynewang May 29, 2024
3841558
format
rynewang May 29, 2024
14df2b4
more tag
rynewang May 30, 2024
49df2b3
Merge branch 'master' into poc-gcs-client-binding
rynewang Jun 8, 2024
a2f2b4d
Merge remote-tracking branch 'origin/master' into poc-gcs-client-binding
rynewang Jun 17, 2024
b30debd
mark bad test
rynewang Jun 17, 2024
2819d11
Add autoscaler stub methods to GcsClient
rynewang Jun 15, 2024
8b36485
add autoscaler bindings
rynewang Jun 17, 2024
b4ad225
remove "Sync" in method names
rynewang Jun 17, 2024
56473da
more methods
rynewang Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2361,6 +2361,11 @@ flatbuffer_cc_library(
out_prefix = "ray/raylet/format/",
)

ray_cc_library(
name = "python_callbacks",
hdrs = ["src/ray/gcs/gcs_client/python_callbacks.h"],
)

pyx_library(
name = "_raylet",
srcs = glob([
Expand Down Expand Up @@ -2392,6 +2397,7 @@ pyx_library(
"//:core_worker_lib",
"//:exported_internal",
"//:gcs_server_lib",
"//:python_callbacks",
"//:global_state_accessor_lib",
"//:raylet_lib",
"//:redis_client",
Expand Down
160 changes: 21 additions & 139 deletions python/ray/_private/gcs_aio_client.py
Original file line number Diff line number Diff line change
@@ -1,153 +1,35 @@
import logging
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor
from ray._raylet import GcsClient
from ray.core.generated import (
gcs_pb2,
)
import ray._private.utils
from ray._private.ray_constants import env_integer

# Number of executor threads. No more than this number of concurrent GcsAioClient calls
# can happen. Extra requests will need to wait for the existing requests to finish.
# Executor rules:
# If the arg `executor` in GcsAioClient constructor is set, use it.
# Otherwise if env var `GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT` is set, use it.
# Otherwise, use 5.
GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT = env_integer(
"GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT", 5
)

from ray._raylet import MyGcsClient

logger = logging.getLogger(__name__)


class AsyncProxy:
def __init__(self, inner, loop, executor):
self.inner = inner
self.loop = loop
self.executor = executor

def _function_to_async(self, func):
async def wrapper(*args, **kwargs):
return await self.loop.run_in_executor(self.executor, func, *args, **kwargs)

return wrapper

def __getattr__(self, name):
"""
If attr is callable, wrap it into an async function.
"""
attr = getattr(self.inner, name)
if callable(attr):
return self._function_to_async(attr)
else:
return attr


class GcsAioClient:
def __init__(
self,
address: str = None,
loop=None,
executor=None,
address: Optional[str] = None,
nums_reconnect_retry: int = 5,
):
if loop is None:
loop = ray._private.utils.get_or_create_event_loop()
if executor is None:
executor = ThreadPoolExecutor(
max_workers=GCS_AIO_CLIENT_DEFAULT_THREAD_COUNT,
thread_name_prefix="gcs_aio_client",
)

self._gcs_client = GcsClient(
address,
nums_reconnect_retry,
)
self._async_proxy = AsyncProxy(self._gcs_client, loop, executor)
self._nums_reconnect_retry = nums_reconnect_retry

@property
def address(self):
return self._gcs_client.address

async def check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
) -> List[bool]:
logger.debug(f"check_alive {node_ips!r}")
return await self._async_proxy.check_alive(node_ips, timeout)

async def internal_kv_get(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> Optional[bytes]:
logger.debug(f"internal_kv_get {key!r} {namespace!r}")
return await self._async_proxy.internal_kv_get(key, namespace, timeout)
self.my_gcs_client = MyGcsClient.standalone(str(address))

async def internal_kv_multi_get(
self,
keys: List[bytes],
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> Dict[bytes, bytes]:
logger.debug(f"internal_kv_multi_get {keys!r} {namespace!r}")
return await self._async_proxy.internal_kv_multi_get(keys, namespace, timeout)

async def internal_kv_put(
self,
key: bytes,
value: bytes,
overwrite: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> int:
"""Put a key-value pair into the GCS.

Args:
key: The key to put.
value: The value to put.
overwrite: Whether to overwrite the value if the key already exists.
namespace: The namespace to put the key-value pair into.
timeout: The timeout in seconds.

Returns:
The number of keys added. If overwrite is True, this will be 1 if the
key was added and 0 if the key was updated. If overwrite is False,
this will be 1 if the key was added and 0 if the key already exists.
"""
logger.debug(f"internal_kv_put {key!r} {value!r} {overwrite} {namespace!r}")
return await self._async_proxy.internal_kv_put(
key, value, overwrite, namespace, timeout
)

async def internal_kv_del(
self,
key: bytes,
del_by_prefix: bool,
namespace: Optional[bytes],
timeout: Optional[float] = None,
) -> int:
logger.debug(f"internal_kv_del {key!r} {del_by_prefix} {namespace!r}")
return await self._async_proxy.internal_kv_del(
key, del_by_prefix, namespace, timeout
def __getattr__(self, name):
async_names = [
"internal_kv_get",
"internal_kv_multi_get",
"internal_kv_put",
"internal_kv_del",
"internal_kv_exists",
"internal_kv_keys",
"check_alive",
"get_all_job_info",
]
if name in async_names:
return getattr(self.my_gcs_client, "async_" + name)
property_names = ["address", "cluster_id"]
if name in property_names:
return getattr(self.my_gcs_client, name)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'"
)

async def internal_kv_exists(
self, key: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> bool:
logger.debug(f"internal_kv_exists {key!r} {namespace!r}")
return await self._async_proxy.internal_kv_exists(key, namespace, timeout)

async def internal_kv_keys(
self, prefix: bytes, namespace: Optional[bytes], timeout: Optional[float] = None
) -> List[bytes]:
logger.debug(f"internal_kv_keys {prefix!r} {namespace!r}")
return await self._async_proxy.internal_kv_keys(prefix, namespace, timeout)

async def get_all_job_info(
self, timeout: Optional[float] = None
) -> Dict[bytes, gcs_pb2.JobTableData]:
"""
Return dict key: bytes of job_id; value: JobTableData pb message.
"""
return await self._async_proxy.get_all_job_info(timeout)
Loading
Loading