Skip to content

Commit

Permalink
[core][compiled graphs] Enable visualization tests (#48627)
Browse files Browse the repository at this point in the history
In #47958 , visualization unit tests were introduced but would not run if CI environment does not have graphvis installed (which was the case).

This PR adds the dependency in test-requirements.txt and always run the visualization tests. It also moves visualization tests to a separate file since test_accelerated_dag.py is too large.
  • Loading branch information
ruisearch42 authored Nov 8, 2024
1 parent cae5910 commit f4a7c70
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 171 deletions.
1 change: 1 addition & 0 deletions python/ray/dag/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ py_test_module_list(
"tests/experimental/test_torch_tensor_dag.py",
"tests/experimental/test_collective_dag.py",
"tests/experimental/test_execution_schedule.py",
"tests/experimental/test_dag_visualization.py",
],
tags = [
"accelerated_dag",
Expand Down
171 changes: 0 additions & 171 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@

logger = logging.getLogger(__name__)

try:
import pydot
except Exception:
logging.info("pydot is not installed, visualization tests will be skiped")

pytestmark = [
pytest.mark.skipif(
Expand Down Expand Up @@ -2489,173 +2485,6 @@ async def main():
loop.run_until_complete(main())


class TestVisualization:

"""Tests for the visualize method of compiled DAGs."""

# TODO(zhilong): "pip intsall pydot"
# and "sudo apt-get install graphviz " to run test.
@pytest.fixture(autouse=True)
def skip_if_pydot_graphviz_not_available(self):
# Skip the test if pydot or graphviz is not available
pytest.importorskip("pydot")
pytest.importorskip("graphviz")

def test_visualize_basic(self, ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
def echo(self, x):
return x

actor = Actor.remote()

with InputNode() as i:
dag = actor.echo.bind(i)

compiled_dag = dag.experimental_compile()

# Call the visualize method
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()

def test_visualize_multi_return(self, ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 4 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

actor = Actor.remote()

with InputNode() as i:
o1, o2 = actor.return_two.bind(i)
dag = MultiOutputNode([o1, o2])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2"), ("1", "3"), ("2", "4"), ("3", "4")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()

def test_visualize_multi_return2(self, ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 5 [label=SharedMemoryType]
4 -> 6 [label=SharedMemoryType]
5 -> 6 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

def echo(self, x):
return x

a = Actor.remote()
b = Actor.remote()
with InputNode() as i:
o1, o2 = a.return_two.bind(i)
o3 = b.echo.bind(o1)
o4 = b.echo.bind(o2)
dag = MultiOutputNode([o3, o4])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4", "5", "6"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {
("0", "1"),
("1", "2"),
("1", "3"),
("2", "4"),
("3", "5"),
("4", "6"),
("5", "6"),
}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
172 changes: 172 additions & 0 deletions python/ray/dag/tests/experimental/test_dag_visualization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import sys
import ray
import pydot
import os
from ray.dag import InputNode, MultiOutputNode
from ray.tests.conftest import * # noqa

import pytest


def test_visualize_basic(ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
def echo(self, x):
return x

actor = Actor.remote()

with InputNode() as i:
dag = actor.echo.bind(i)

compiled_dag = dag.experimental_compile()

# Call the visualize method
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


def test_visualize_multi_return(ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 4 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

actor = Actor.remote()

with InputNode() as i:
o1, o2 = actor.return_two.bind(i)
dag = MultiOutputNode([o1, o2])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {("0", "1"), ("1", "2"), ("1", "3"), ("2", "4"), ("3", "4")}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


def test_visualize_multi_return2(ray_start_regular):
"""
Expect output or dot_source:
MultiOutputNode" fillcolor=yellow shape=rectangle style=filled]
0 -> 1 [label=SharedMemoryType]
1 -> 2 [label=SharedMemoryType]
1 -> 3 [label=SharedMemoryType]
2 -> 4 [label=SharedMemoryType]
3 -> 5 [label=SharedMemoryType]
4 -> 6 [label=SharedMemoryType]
5 -> 6 [label=SharedMemoryType]
"""

@ray.remote
class Actor:
@ray.method(num_returns=2)
def return_two(self, x):
return x, x + 1

def echo(self, x):
return x

a = Actor.remote()
b = Actor.remote()
with InputNode() as i:
o1, o2 = a.return_two.bind(i)
o3 = b.echo.bind(o1)
o4 = b.echo.bind(o2)
dag = MultiOutputNode([o3, o4])

compiled_dag = dag.experimental_compile()

# Get the DOT source
dot_source = compiled_dag.visualize(return_dot=True)

graphs = pydot.graph_from_dot_data(dot_source)
graph = graphs[0]

node_names = {node.get_name() for node in graph.get_nodes()}
edge_pairs = {
(edge.get_source(), edge.get_destination()) for edge in graph.get_edges()
}

expected_nodes = {"0", "1", "2", "3", "4", "5", "6"}
assert expected_nodes.issubset(
node_names
), f"Expected nodes {expected_nodes} not found."

expected_edges = {
("0", "1"),
("1", "2"),
("1", "3"),
("2", "4"),
("3", "5"),
("4", "6"),
("5", "6"),
}
assert expected_edges.issubset(
edge_pairs
), f"Expected edges {expected_edges} not found."

compiled_dag.teardown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
1 change: 1 addition & 0 deletions python/requirements/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ freezegun==1.1.0
google-api-python-client==2.111.0
google-cloud-storage==2.14.0
gradio==3.50.2; platform_system != "Windows"
graphviz==0.20.3
websockets==11.0.3
joblib==1.2.0
jsonpatch==1.32
Expand Down

0 comments on commit f4a7c70

Please sign in to comment.