Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Enable visualization tests #48627

Merged
merged 6 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/dag/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ py_test_module_list(
"tests/experimental/test_torch_tensor_dag.py",
"tests/experimental/test_collective_dag.py",
"tests/experimental/test_execution_schedule.py",
"tests/experimental/test_dag_visualization.py",
],
tags = [
"accelerated_dag",
Expand Down
171 changes: 0 additions & 171 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@

logger = logging.getLogger(__name__)

try:
import pydot
except Exception:
logging.info("pydot is not installed, visualization tests will be skiped")

pytestmark = [
pytest.mark.skipif(
Expand Down Expand Up @@ -2439,173 +2435,6 @@ async def main():
loop.run_until_complete(main())


class TestVisualization:

"""Tests for the visualize method of compiled DAGs."""

# TODO(zhilong): "pip intsall pydot"
# and "sudo apt-get install graphviz " to run test.
@pytest.fixture(autouse=True)
def skip_if_pydot_graphviz_not_available(self):
# Skip the test if pydot or graphviz is not available
pytest.importorskip("pydot")
pytest.importorskip("graphviz")

def test_visualize_basic(self, ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
def echo(self, x):
return x

actor = Actor.remote()

with InputNode() as i:
dag = actor.echo.bind(i)

compiled_dag = dag.experimental_compile()

# Call the visualize method
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()

def test_visualize_multi_return(self, ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 4 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

actor = Actor.remote()

with InputNode() as i:
o1, o2 = actor.return_two.bind(i)
dag = MultiOutputNode([o1, o2])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2"), ("1", "3"), ("2", "4"), ("3", "4")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()

def test_visualize_multi_return2(self, ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 5 [label=SharedMemoryType]
4 -> 6 [label=SharedMemoryType]
5 -> 6 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

def echo(self, x):
return x

a = Actor.remote()
b = Actor.remote()
with InputNode() as i:
o1, o2 = a.return_two.bind(i)
o3 = b.echo.bind(o1)
o4 = b.echo.bind(o2)
dag = MultiOutputNode([o3, o4])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4", "5", "6"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {
("0", "1"),
("1", "2"),
("1", "3"),
("2", "4"),
("3", "5"),
("4", "6"),
("5", "6"),
}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
172 changes: 172 additions & 0 deletions python/ray/dag/tests/experimental/test_dag_visualization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import sys
import ray
import pydot
import os
from ray.dag import InputNode, MultiOutputNode
from ray.tests.conftest import * # noqa

import pytest


def test_visualize_basic(ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
def echo(self, x):
return x

actor = Actor.remote()

with InputNode() as i:
dag = actor.echo.bind(i)

compiled_dag = dag.experimental_compile()

# Call the visualize method
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


def test_visualize_multi_return(ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 4 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

actor = Actor.remote()

with InputNode() as i:
o1, o2 = actor.return_two.bind(i)
dag = MultiOutputNode([o1, o2])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2"), ("1", "3"), ("2", "4"), ("3", "4")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


def test_visualize_multi_return2(ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 5 [label=SharedMemoryType]
4 -> 6 [label=SharedMemoryType]
5 -> 6 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

def echo(self, x):
return x

a = Actor.remote()
b = Actor.remote()
with InputNode() as i:
o1, o2 = a.return_two.bind(i)
o3 = b.echo.bind(o1)
o4 = b.echo.bind(o2)
dag = MultiOutputNode([o3, o4])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4", "5", "6"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {
("0", "1"),
("1", "2"),
("1", "3"),
("2", "4"),
("3", "5"),
("4", "6"),
("5", "6"),
}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
1 change: 1 addition & 0 deletions python/requirements/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ freezegun==1.1.0
google-api-python-client==2.111.0
google-cloud-storage==2.14.0
gradio==3.50.2; platform_system != "Windows"
graphviz==0.20.3
websockets==11.0.3
joblib==1.2.0
jsonpatch==1.32
Expand Down