forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[core][experimental] Avoid serialization for data passed between two …
…tasks on the same actor (ray-project#45591) * `LocalChannel` is a channel for communication between two tasks in the same worker process. It writes data directly to the worker's serialization context and reads data from the serialization context to avoid the serialization overhead and the need for reading/writing from shared memory. * `MultiChannel` can be used to send data to different readers via different channels. For example, if the reader is in the same worker process as the writer, the data can be sent via LocalChannel. If the reader is in a different worker process, the data can be sent via shared memory channel. ## Simple benchmark * I used [this Python script](https://gist.github.com/kevin85421/0ebffad403d158ab140c4d4dc879e214) to conduct a simple benchmark with the commit ray-project@97292b1 for an early signal. * Experiment results * Case 1: with this PR => `Execution time: 1.246751070022583 seconds` * Case 2: without this PR => `Execution time: 2.6125080585479736 seconds` Closes ray-project#45230 --------- Signed-off-by: kaihsun <kaihsun@anyscale.com>
- Loading branch information
1 parent
af3e54e
commit f27f50b
Showing
8 changed files
with
523 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import uuid | ||
from typing import Any, Optional | ||
|
||
import ray | ||
from ray.experimental.channel import ChannelContext | ||
from ray.experimental.channel.common import ChannelInterface | ||
from ray.util.annotations import PublicAPI | ||
|
||
|
||
@PublicAPI(stability="alpha") | ||
class IntraProcessChannel(ChannelInterface): | ||
""" | ||
IntraProcessChannel is a channel for communication between two tasks in the same | ||
worker process. It writes data directly to the worker's _SerializationContext | ||
and reads data from the _SerializationContext to avoid the serialization | ||
overhead and the need for reading/writing from shared memory. | ||
Args: | ||
actor_handle: The actor handle of the worker process. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
actor_handle: ray.actor.ActorHandle, | ||
_channel_id: Optional[str] = None, | ||
): | ||
# TODO (kevin85421): Currently, if we don't pass `actor_handle` to | ||
# `IntraProcessChannel`, the actor will die due to the reference count of | ||
# `actor_handle` is 0. We should fix this issue in the future. | ||
self._actor_handle = actor_handle | ||
# Generate a unique ID for the channel. The writer and reader will use | ||
# this ID to store and retrieve data from the _SerializationContext. | ||
self._channel_id = _channel_id | ||
if self._channel_id is None: | ||
self._channel_id = str(uuid.uuid4()) | ||
|
||
def ensure_registered_as_writer(self) -> None: | ||
pass | ||
|
||
def ensure_registered_as_reader(self) -> None: | ||
pass | ||
|
||
def __reduce__(self): | ||
return IntraProcessChannel, ( | ||
self._actor_handle, | ||
self._channel_id, | ||
) | ||
|
||
def write(self, value: Any): | ||
# Because both the reader and writer are in the same worker process, | ||
# we can directly store the data in the context instead of storing | ||
# it in the channel object. This removes the serialization overhead of `value`. | ||
ctx = ChannelContext.get_current().serialization_context | ||
ctx.set_data(self._channel_id, value) | ||
|
||
def begin_read(self) -> Any: | ||
ctx = ChannelContext.get_current().serialization_context | ||
return ctx.get_data(self._channel_id) | ||
|
||
def end_read(self): | ||
pass | ||
|
||
def close(self) -> None: | ||
ctx = ChannelContext.get_current().serialization_context | ||
ctx.reset_data(self._channel_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.