Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Build an operation-based execution schedule for each actor to avoid deadlocks caused by NCCL operations #46911

Merged
merged 112 commits into from
Aug 14, 2024

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented Aug 1, 2024

Why are these changes needed?

  • Generate an execution schedule for each actor. The schedule is a list of DAGNodeOperation.
    • Step 1: Generate a graph based on the following rules:

      • Divide a DAG node into three GraphNodes: READ, COMPUTE, and WRITE. Each GraphNode has a DAGNodeOperation.
      • Add edges between READ and COMPUTE, and between COMPUTE and WRITE, which belong to the same task.
      • Add an edge between COMPUTE with bind_index i and COMPUTE with bind_index i+1 if they belong to the same actor.
      • Add an edge between WRITE of the writer task and READ of the reader task.
    • Step 2: Topological sort: If there are multiple GraphNodes with zero in-degree, select one based on the following rules:

      • (1) If the nodes are not NCCL write nodes, select the one with the smallest bind_index. If there are multiple candidate nodes with the smallest bind_index of the actors that they belong to, any one of them is acceptable. For the implementation details, we maintain a priority queue for each actor, where the peek of the priority queue is the node with the smallest bind_index.
      • (2) If the node is an NCCL write node, select it only if all of its downstream nodes are also the peeks of their priority queues.
      • (3) If (1) and (2) cannot be satisfied, it means that all candidate nodes are NCCL write nodes. In this case, select the one that is the peek of the priority queue and its downstream nodes, regardless of whether the downstream nodes are peeks of their priority queues or not.
    • Then, put the selected nodes into the corresponding actors' schedules.

Example: 1F1B pipeline parallelism for training

  • New dependency graph
    image

  • A 'happen-before' graph for deadlock detection: Without this PR, the DAG will have a deadlock because the graph contains a cycle.
    image

  • The schedule built by this PR.
    image

Next steps

  • Update deadlock detection. If a deadlock exists, the graph introduced by this PR will not have a cycle. Hence, I am still keeping the existing deadlock detection mechanism, but the old one has some false positives.
  • Generate channel-based execution schedules. For example, actor1.t1 and actor1.t2 send data via NCCL channels to actor2.t1, and actor1.t1 and actor1.t2 have a control dependency. In this case, actor2.t1 needs to read the channel between actor1.t1 and actor2.t1 first, and then read the channel between actor1.t2 and actor2.t1 to avoid deadlocks.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@kevin85421 kevin85421 changed the title [WIP] Build an operation-based execution schedule for each actor [core][experimental] Build an operation-based execution schedule for each actor Aug 1, 2024
@@ -788,7 +788,7 @@ def test_compiled_dag_ref_del(ray_start_regular):
compiled_dag.teardown()


def test_dag_fault_tolerance_chain(ray_start_regular_shared):
def test_dag_fault_tolerance_chain(ray_start_regular):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using ray_start_regular_shared, Raylet will print a warning message saying, "XX PYTHON worker processes have been started on node: ...". XX becomes larger and larger if we use ray_start_regular_shared.

Screenshot 2024-07-29 at 11 31 16 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this usually indicates the workers are not properly cleaned up.

@kevin85421 kevin85421 changed the title [core][experimental] Build an operation-based execution schedule for each actor [core][experimental] Build an operation-based execution schedule for each actor to avoid deadlocks caused by NCCL operations Aug 1, 2024
@kevin85421 kevin85421 marked this pull request as ready for review August 1, 2024 07:11
The communication between workers is done using NCCL. The communication
within the worker actor is done using IntraProcessChannel.
"""
monkeypatch.setattr(ray.dag.constants, "RAY_ADAG_ENABLE_DETECT_DEADLOCK", False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need time to think about the deadlock detection in this PR. The existing deadlock detection generates some false alarms after this PR.

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

I moved a test from test_torch_tensor_dag.py to test_execution_schedule_gpu.py, and it passed. I'm not sure why the test can successfully run experimental_compile without the error mentioned in #46911 (comment). I am still troubleshooting.

5b7c318

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

Create an issue to track the progress to add the GPU tests back to CI.

#47093

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
return False
next_nodes: List[DAGOperationGraphNode] = []
first_nccl_node: Optional[DAGOperationGraphNode] = None
for _, candidates in actor_to_candidates.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was using bind_index to mean the local_idx that you're using here. I didn't realize before that bind_index could be non-contiguous for a single DAG.

What's the motivation here? I think the execution schedule should be the same if there is no NCCL channel in the graph.

Yes, the motivation for this suggestion is the same. Right now the execution schedule will favor scheduling tasks on actors that appear first in the dictionary, which may not be the same depending on what order the actors are inserted into the dictionary.

python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you run the microbenchmark? If we address the rest of @stephanie-wang's PR, I am okay with merging it without splitting to unblock @woshiyyya .

@@ -386,6 +324,79 @@ def __init__(
assert not isinstance(val, ChannelInterface)
assert not isinstance(val, DAGInputAdapter)

self.input_reader: ReaderInterface = SynchronousReader(self.input_channels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can handle it later. this just assumes reader/writer is syncrhonous, and I wondered if this should be passed as an input (so that we can support different implementation)

kevin85421 and others added 2 commits August 13, 2024 09:29
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Kai-Hsun Chen <kaihsun@apache.org>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@rkooo567
Copy link
Contributor

lint failures!

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

I ran the microbenchmark locally. The results are in the same range as the numbers on CD. Some metrics are a bit higher (e.g., local put:local get, single channel), and some are a bit lower (e.g., compiled single-actor DAG calls per second). I didn't use the same setup as CD, so I'm not sure if it's fair to compare them. Additionally, there are some bugs in tearing down the DAG in the microbenchmark, which I worked around.

image

@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label Aug 14, 2024
@rkooo567 rkooo567 enabled auto-merge (squash) August 14, 2024 16:07
@github-actions github-actions bot disabled auto-merge August 14, 2024 20:02
@rkooo567 rkooo567 merged commit 3ba2e9f into ray-project:master Aug 14, 2024
5 checks passed
simonsays1980 pushed a commit to simonsays1980/ray that referenced this pull request Aug 15, 2024
…each actor to avoid deadlocks caused by NCCL operations (ray-project#46911)

Generate an execution schedule for each actor. The schedule is a list of DAGNodeOperation.
Step 1: Generate a graph based on the following rules:

Divide a DAG node into three GraphNodes: READ, COMPUTE, and WRITE. Each GraphNode has a DAGNodeOperation.
Add edges between READ and COMPUTE, and between COMPUTE and WRITE, which belong to the same task.
Add an edge between COMPUTE with bind_index i and COMPUTE with bind_index i+1 if they belong to the same actor.
Add an edge between WRITE of the writer task and READ of the reader task.
Step 2: Topological sort: If there are multiple GraphNodes with zero in-degree, select one based on the following rules:

(1) If the nodes are not NCCL write nodes, select the one with the smallest bind_index. If there are multiple candidate nodes with the smallest bind_index of the actors that they belong to, any one of them is acceptable. For the implementation details, we maintain a priority queue for each actor, where the peek of the priority queue is the node with the smallest bind_index.
(2) If the node is an NCCL write node, select it only if all of its downstream nodes are also the peeks of their priority queues.
(3) If (1) and (2) cannot be satisfied, it means that all candidate nodes are NCCL write nodes. In this case, select the one that is the peek of the priority queue and its downstream nodes, regardless of whether the downstream nodes are peeks of their priority queues or not.
Then, put the selected nodes into the corresponding actors' schedules.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants