Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Avoid serialization for data passed between two tasks on the same actor #45591

Merged
merged 24 commits into from
Jun 13, 2024

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented May 28, 2024

Why are these changes needed?

This PR includes two parts:

  • LocalChannel is a channel for communication between two tasks in the same worker process. It writes data directly to the worker's serialization context and reads data from the serialization context to avoid the serialization overhead and the need for reading/writing from shared memory.

  • MultiChannel can be used to send data to different readers via different channels. For example, if the reader is in the same worker process as the writer, the data can be sent via LocalChannel. If the reader is in a different worker process, the data can be sent via shared memory channel.

Simple benchmark

  • I used this Python script to conduct a simple benchmark with the commit 97292b1 for an early signal.
  • Experiment results
    • Case 1: with this PR => Execution time: 1.246751070022583 seconds
    • Case 2: without this PR => Execution time: 2.6125080585479736 seconds

Related issue number

Closes #45230

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@kevin85421 kevin85421 marked this pull request as draft May 28, 2024 16:20
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
self,
writer: Optional[ray.actor.ActorHandle],
readers: List[Optional[ray.actor.ActorHandle]],
local_channel: Optional[LocalChannel] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang I used local_channel and remote_channel instead of channels (List[ChannelInterface]). Although we can easily implement write with self.channels, readers still need to retrieve the corresponding channel from self.channels to read the data.

def write(self, value):
  for ch in self.channels:
    ch.write(value)

Copy link
Contributor

@stephanie-wang stephanie-wang May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what if we support an interface like channels_dict, keyed by reader actor ID? Then each reader can retrieve its channel in ensure_registered_as_reader.

A nice thing about this interface is that it also supports the case where readers are on different nodes (cc @jackhumphries).

channel_dict: Dict[ray.ActorID, ChannelInterface]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated af458bf

# TODO (kevin85421): Currently, if we don't pass `actor_handle` to
# `LocalChannel`, the actor will die due to the reference count of
# `actor_handle` is 0. We should fix this issue in the future.
self._actor_handle = actor_handle
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without passing the actor handle, the actor will be killed due to the reference count. This is a Ray Core bug. I will fix it in a followup PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks. I think it's also fine if you want to just file an issue for this and we can address it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I saw what I think is the same issue and should have a fix for it soon.

Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
@@ -43,7 +43,7 @@ def do_allocate_channel(

Args:
readers: The actor handles of the readers.
buffer_size_bytes: The maximum size of messages in the channel.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't have an argument buffer_size_bytes.

assert hasattr(self, "_local_channel") or hasattr(self, "_remote_channel")

def ensure_registered_as_writer(self) -> None:
if self._local_channel:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalChannel's ensure_registered_as_writer() doesn't currently do anything. I am calling the function here to ensure we don't forget to update it should we make any changes to ensure_registered_as_writer() in LocalChannel in the future. Some following functions have the same reason.

@kevin85421 kevin85421 changed the title [WIP] ADAG local channel [core][experimental] Avoid serialization for data passed between two tasks on the same actor May 31, 2024
@kevin85421 kevin85421 marked this pull request as ready for review May 31, 2024 05:22
Copy link
Contributor

@jackhumphries jackhumphries left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add tests to python/ray/tests/test_channel.py as well for LocalChannel and MultiChannel?

python/ray/experimental/channel/local_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_accelerated_dag.py Outdated Show resolved Hide resolved
# we can directly store the data in the context instead of storing
# it in the channel object. This reduces the serialization overhead of `value`.
ctx = ChannelContext.get_current().serialization_context
ctx.set_data(self.channel_id, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this just writes an object reference to the serialization context? (or in the case of a primitive value, the entire primitive value is written?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether I understand this correctly or not. We don't serialize the data because the reader and writer are in the same actor process.

python/ray/experimental/channel/local_channel.py Outdated Show resolved Hide resolved
# TODO (kevin85421): Currently, if we don't pass `actor_handle` to
# `LocalChannel`, the actor will die due to the reference count of
# `actor_handle` is 0. We should fix this issue in the future.
self._actor_handle = actor_handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks. I think it's also fine if you want to just file an issue for this and we can address it later.

python/ray/experimental/channel/local_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/local_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/serialization_context.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/serialization_context.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
self,
writer: Optional[ray.actor.ActorHandle],
readers: List[Optional[ray.actor.ActorHandle]],
local_channel: Optional[LocalChannel] = None,
Copy link
Contributor

@stephanie-wang stephanie-wang May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what if we support an interface like channels_dict, keyed by reader actor ID? Then each reader can retrieve its channel in ensure_registered_as_reader.

A nice thing about this interface is that it also supports the case where readers are on different nodes (cc @jackhumphries).

channel_dict: Dict[ray.ActorID, ChannelInterface]

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looking good! I think we can clean it up a bit more. Agree with Jack about adding non-DAG channel tests.

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 31, 2024
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Copy link
Contributor

@jackhumphries jackhumphries left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this PR also adds support for having different readers for a single channel on different nodes. This is something else we've been meaning to add.

Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
@@ -649,12 +649,12 @@ def _get_or_compile(

if isinstance(task.dag_node, ClassMethodNode):
readers = [self.idx_to_task[idx] for idx in task.downstream_node_idxs]
assert len(readers) == 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion will make the following case fail: the actor a has two readers (actor b and itself)

a = Actor.remote(0)
b = Actor.remote(100)
with InputNode() as inp:
    dag = a.inc.bind(inp)
    dag = MultiOutputNode([a.inc.bind(dag), b.inc.bind(dag)])

compiled_dag = dag.experimental_compile()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a known bug, cc @jackhumphries

I think this check got added when Jack was looking into supporting multi-node. I think you could remove it, and it should be okay as long as all of the remote readers are on the same node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of the remote readers are on the same node.

Why should all remote readers be on the same node?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now that is required because all of the remote readers will share one channel. But with your PR, we can create multiple channels, one for each unique reader node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will update my PR. Currently, it only creates one Channel for all remote readers (readers which are not on the same actor as the writer).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is currently something I need to work on. The hope is that this PR will make it easy at this point to add support for readers across multiple nodes, though you don't need to explicitly support that in this PR--I will take care of it once this PR is merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chat with @jackhumphries offline. He will work on this part. I will not include the multi-node support in this PR.

Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
Comment on lines +481 to +482
_channel_dict: Optional[Dict[ray.ActorID, ChannelInterface]] = None,
_channels: Optional[Set[ChannelInterface]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc for these params

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not user-facing parameters; they are only used during deserialization. I am not sure whether we should add it or not. The class Channel doesn't add docs for these parameters.

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
Comment on lines 489 to 491
self._channel_dict = _channel_dict or {}
# A set of channel objects. The set is a deduplicated version of _channel_dict.
self._channels = _channels or set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are at most two channels, one intra-process, one shared-memory? I feel the code would be cleaner if we explicitly create these two channels (None if not needed), and each reader operates on different channel based on its location?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the code would be cleaner if we explicitly create these two channels (None if not needed)

This is the first version of implementation. You can see the previous commits for more details.

@kevin85421
Copy link
Member Author

The CI failure is related to the reference count issue.
Screenshot 2024-06-10 at 10 38 03 PM

Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

kevin85421 commented Jun 12, 2024

I remove the out-of-band actor from CompositeChannel 53373ba. It fixes the reference count issue. Let's see whether it can pass all CI tests or not.

[Update]: CI tests pass

@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label Jun 12, 2024
@kevin85421
Copy link
Member Author

All tests pass

@stephanie-wang
Copy link
Contributor

Hmm I just realized there is a bit of a gotcha here when the same actor reads its own output twice. The problem is that the first actor task can modify the input, like this:

with InputNode() as inp:
  dag = a.make_list.bind(inp)
  dag = MultiOutputNode([a.append_to_list(dag, "x"), a.append_to_list(dag, "y")])

Sorry for this last-minute change, but can we instead disallow passing to multiple tasks on the same actor? That would be the safest for now, and we can revisit this problem later.

this case.

To elaborate, all output channels of the actor DAG nodes will be
CompositeChannel, and the first two will have a local channel, while the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use CompositeChannels here? Isn't there only one reader per channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there only one reader per channel?

Yes.

Why do we use CompositeChannels here?

In this PR, the SharedMemoryType's create_channel function creates a CompositeChannel instead of a Channel. The CompositeChannel will determine whether to use only IntraProcessChannel, only Channel, or both IntraProcessChannel and Channel to transfer the data under the hood.

python/ray/experimental/channel/serialization_context.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/serialization_context.py Outdated Show resolved Hide resolved
@kevin85421
Copy link
Member Author

@stephanie-wang, I want to confirm that I understand your point in this comment correctly. In the following example, the output will be [[0, 0, 0, 0, 0, 'x'], [0, 0, 0, 0, 0, 'x', 'y']], but this is not what we want. We want the output to be [[0, 0, 0, 0, 0, 'x'], [0, 0, 0, 0, 0, 'y']] instead. Is my understanding correct? Thanks!

import ray
from ray.dag import InputNode, MultiOutputNode


@ray.remote
class Actor:
    def __init__(self):
        pass

    def make_list(self, num_elements):
        return [0 for _ in range(num_elements)]

    def append_to_list(self, lst, element):
        lst.append(element)
        return lst


a = Actor.remote()
with InputNode() as inp:
    dag = a.make_list.bind(inp)
    dag = MultiOutputNode([a.append_to_list.bind(dag, "x"), a.append_to_list.bind(dag, "y")])

compiled_dag = dag.experimental_compile()
output_channel = compiled_dag.execute(5)
result = output_channel.begin_read()

print(result)

output_channel.end_read()
compiled_dag.teardown()

Signed-off-by: kaihsun <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

#45591 (comment)

Update c16d8a0

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

@stephanie-wang stephanie-wang enabled auto-merge (squash) June 12, 2024 23:59
@stephanie-wang stephanie-wang merged commit d577652 into ray-project:master Jun 13, 2024
7 checks passed
aslonnie added a commit that referenced this pull request Jun 13, 2024
…een two tasks on the same actor" (#45925)

Reverts #45591

tests (`//python/ray/dag:test_torch_tensor_dag_gpu` and some more)
failing with segfault:

examples:

-
https://buildkite.com/ray-project/microcheck/builds/1555#01900fbf-7015-4986-b0be-fec0a20caf08/175-981
-
https://buildkite.com/ray-project/postmerge/builds/4886#01901018-8a42-41d5-9187-e980a86df6a9/175-915

tests against commits before this PR do not show similar failures.
kevin85421 added a commit to kevin85421/ray that referenced this pull request Jun 13, 2024
…tasks on the same actor (ray-project#45591)

* `LocalChannel` is a channel for communication between two tasks in the
same worker process. It writes data directly to the worker's
serialization context and reads data from the serialization context to
avoid the serialization overhead and the need for reading/writing from
shared memory.

* `MultiChannel` can be used to send data to different readers via
different channels. For example, if the reader is in the same worker
process as the writer, the data can be sent via LocalChannel. If the
reader is in a different worker process, the data can be sent via shared
memory channel.

## Simple benchmark

* I used [this Python
script](https://gist.github.com/kevin85421/0ebffad403d158ab140c4d4dc879e214)
to conduct a simple benchmark with the commit
ray-project@97292b1
for an early signal.
* Experiment results
  * Case 1: with this PR => `Execution time: 1.246751070022583 seconds`
* Case 2: without this PR => `Execution time: 2.6125080585479736
seconds`

Closes ray-project#45230

---------

Signed-off-by: kaihsun <kaihsun@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][experimental] Avoid serialization for data passed between two tasks on the same actor
4 participants