diff --git a/python/ray/dag/BUILD b/python/ray/dag/BUILD index 5026789febd0..c315c14a522a 100644 --- a/python/ray/dag/BUILD +++ b/python/ray/dag/BUILD @@ -108,6 +108,7 @@ py_test_module_list( "tests/experimental/test_torch_tensor_dag.py", "tests/experimental/test_collective_dag.py", "tests/experimental/test_execution_schedule.py", + "tests/experimental/test_dag_visualization.py", ], tags = [ "accelerated_dag", diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 5c13505148fa..d6176fd57dc6 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -30,10 +30,6 @@ logger = logging.getLogger(__name__) -try: - import pydot -except Exception: - logging.info("pydot is not installed, visualization tests will be skiped") pytestmark = [ pytest.mark.skipif( @@ -2489,173 +2485,6 @@ async def main(): loop.run_until_complete(main()) -class TestVisualization: - - """Tests for the visualize method of compiled DAGs.""" - - # TODO(zhilong): "pip intsall pydot" - # and "sudo apt-get install graphviz " to run test. - @pytest.fixture(autouse=True) - def skip_if_pydot_graphviz_not_available(self): - # Skip the test if pydot or graphviz is not available - pytest.importorskip("pydot") - pytest.importorskip("graphviz") - - def test_visualize_basic(self, ray_start_regular): - """ - Expect output or dot_source: - MultiOutputNode" fillcolor=yellow shape=rectangle style=filled] - 0 -> 1 [label=SharedMemoryType] - 1 -> 2 [label=SharedMemoryType] - """ - - @ray.remote - class Actor: - def echo(self, x): - return x - - actor = Actor.remote() - - with InputNode() as i: - dag = actor.echo.bind(i) - - compiled_dag = dag.experimental_compile() - - # Call the visualize method - dot_source = compiled_dag.visualize(return_dot=True) - - graphs = pydot.graph_from_dot_data(dot_source) - graph = graphs[0] - - node_names = {node.get_name() for node in graph.get_nodes()} - edge_pairs = { - (edge.get_source(), edge.get_destination()) for edge in graph.get_edges() - } - - expected_nodes = {"0", "1", "2"} - assert expected_nodes.issubset( - node_names - ), f"Expected nodes {expected_nodes} not found." - - expected_edges = {("0", "1"), ("1", "2")} - assert expected_edges.issubset( - edge_pairs - ), f"Expected edges {expected_edges} not found." - - compiled_dag.teardown() - - def test_visualize_multi_return(self, ray_start_regular): - """ - Expect output or dot_source: - MultiOutputNode" fillcolor=yellow shape=rectangle style=filled] - 0 -> 1 [label=SharedMemoryType] - 1 -> 2 [label=SharedMemoryType] - 1 -> 3 [label=SharedMemoryType] - 2 -> 4 [label=SharedMemoryType] - 3 -> 4 [label=SharedMemoryType] - """ - - @ray.remote - class Actor: - @ray.method(num_returns=2) - def return_two(self, x): - return x, x + 1 - - actor = Actor.remote() - - with InputNode() as i: - o1, o2 = actor.return_two.bind(i) - dag = MultiOutputNode([o1, o2]) - - compiled_dag = dag.experimental_compile() - - # Get the DOT source - dot_source = compiled_dag.visualize(return_dot=True) - - graphs = pydot.graph_from_dot_data(dot_source) - graph = graphs[0] - - node_names = {node.get_name() for node in graph.get_nodes()} - edge_pairs = { - (edge.get_source(), edge.get_destination()) for edge in graph.get_edges() - } - - expected_nodes = {"0", "1", "2", "3", "4"} - assert expected_nodes.issubset( - node_names - ), f"Expected nodes {expected_nodes} not found." - - expected_edges = {("0", "1"), ("1", "2"), ("1", "3"), ("2", "4"), ("3", "4")} - assert expected_edges.issubset( - edge_pairs - ), f"Expected edges {expected_edges} not found." - - compiled_dag.teardown() - - def test_visualize_multi_return2(self, ray_start_regular): - """ - Expect output or dot_source: - MultiOutputNode" fillcolor=yellow shape=rectangle style=filled] - 0 -> 1 [label=SharedMemoryType] - 1 -> 2 [label=SharedMemoryType] - 1 -> 3 [label=SharedMemoryType] - 2 -> 4 [label=SharedMemoryType] - 3 -> 5 [label=SharedMemoryType] - 4 -> 6 [label=SharedMemoryType] - 5 -> 6 [label=SharedMemoryType] - """ - - @ray.remote - class Actor: - @ray.method(num_returns=2) - def return_two(self, x): - return x, x + 1 - - def echo(self, x): - return x - - a = Actor.remote() - b = Actor.remote() - with InputNode() as i: - o1, o2 = a.return_two.bind(i) - o3 = b.echo.bind(o1) - o4 = b.echo.bind(o2) - dag = MultiOutputNode([o3, o4]) - - compiled_dag = dag.experimental_compile() - - # Get the DOT source - dot_source = compiled_dag.visualize(return_dot=True) - - graphs = pydot.graph_from_dot_data(dot_source) - graph = graphs[0] - - node_names = {node.get_name() for node in graph.get_nodes()} - edge_pairs = { - (edge.get_source(), edge.get_destination()) for edge in graph.get_edges() - } - - expected_nodes = {"0", "1", "2", "3", "4", "5", "6"} - assert expected_nodes.issubset( - node_names - ), f"Expected nodes {expected_nodes} not found." - - expected_edges = { - ("0", "1"), - ("1", "2"), - ("1", "3"), - ("2", "4"), - ("3", "5"), - ("4", "6"), - ("5", "6"), - } - assert expected_edges.issubset( - edge_pairs - ), f"Expected edges {expected_edges} not found." - - compiled_dag.teardown() - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/dag/tests/experimental/test_dag_visualization.py b/python/ray/dag/tests/experimental/test_dag_visualization.py new file mode 100644 index 000000000000..dee5096c92b3 --- /dev/null +++ b/python/ray/dag/tests/experimental/test_dag_visualization.py @@ -0,0 +1,172 @@ +import sys +import ray +import pydot +import os +from ray.dag import InputNode, MultiOutputNode +from ray.tests.conftest import * # noqa + +import pytest + + +def test_visualize_basic(ray_start_regular): + """ + Expect output or dot_source: + MultiOutputNode" fillcolor=yellow shape=rectangle style=filled] + 0 -> 1 [label=SharedMemoryType] + 1 -> 2 [label=SharedMemoryType] + """ + + @ray.remote + class Actor: + def echo(self, x): + return x + + actor = Actor.remote() + + with InputNode() as i: + dag = actor.echo.bind(i) + + compiled_dag = dag.experimental_compile() + + # Call the visualize method + dot_source = compiled_dag.visualize(return_dot=True) + + graphs = pydot.graph_from_dot_data(dot_source) + graph = graphs[0] + + node_names = {node.get_name() for node in graph.get_nodes()} + edge_pairs = { + (edge.get_source(), edge.get_destination()) for edge in graph.get_edges() + } + + expected_nodes = {"0", "1", "2"} + assert expected_nodes.issubset( + node_names + ), f"Expected nodes {expected_nodes} not found." + + expected_edges = {("0", "1"), ("1", "2")} + assert expected_edges.issubset( + edge_pairs + ), f"Expected edges {expected_edges} not found." + + compiled_dag.teardown() + + +def test_visualize_multi_return(ray_start_regular): + """ + Expect output or dot_source: + MultiOutputNode" fillcolor=yellow shape=rectangle style=filled] + 0 -> 1 [label=SharedMemoryType] + 1 -> 2 [label=SharedMemoryType] + 1 -> 3 [label=SharedMemoryType] + 2 -> 4 [label=SharedMemoryType] + 3 -> 4 [label=SharedMemoryType] + """ + + @ray.remote + class Actor: + @ray.method(num_returns=2) + def return_two(self, x): + return x, x + 1 + + actor = Actor.remote() + + with InputNode() as i: + o1, o2 = actor.return_two.bind(i) + dag = MultiOutputNode([o1, o2]) + + compiled_dag = dag.experimental_compile() + + # Get the DOT source + dot_source = compiled_dag.visualize(return_dot=True) + + graphs = pydot.graph_from_dot_data(dot_source) + graph = graphs[0] + + node_names = {node.get_name() for node in graph.get_nodes()} + edge_pairs = { + (edge.get_source(), edge.get_destination()) for edge in graph.get_edges() + } + + expected_nodes = {"0", "1", "2", "3", "4"} + assert expected_nodes.issubset( + node_names + ), f"Expected nodes {expected_nodes} not found." + + expected_edges = {("0", "1"), ("1", "2"), ("1", "3"), ("2", "4"), ("3", "4")} + assert expected_edges.issubset( + edge_pairs + ), f"Expected edges {expected_edges} not found." + + compiled_dag.teardown() + + +def test_visualize_multi_return2(ray_start_regular): + """ + Expect output or dot_source: + MultiOutputNode" fillcolor=yellow shape=rectangle style=filled] + 0 -> 1 [label=SharedMemoryType] + 1 -> 2 [label=SharedMemoryType] + 1 -> 3 [label=SharedMemoryType] + 2 -> 4 [label=SharedMemoryType] + 3 -> 5 [label=SharedMemoryType] + 4 -> 6 [label=SharedMemoryType] + 5 -> 6 [label=SharedMemoryType] + """ + + @ray.remote + class Actor: + @ray.method(num_returns=2) + def return_two(self, x): + return x, x + 1 + + def echo(self, x): + return x + + a = Actor.remote() + b = Actor.remote() + with InputNode() as i: + o1, o2 = a.return_two.bind(i) + o3 = b.echo.bind(o1) + o4 = b.echo.bind(o2) + dag = MultiOutputNode([o3, o4]) + + compiled_dag = dag.experimental_compile() + + # Get the DOT source + dot_source = compiled_dag.visualize(return_dot=True) + + graphs = pydot.graph_from_dot_data(dot_source) + graph = graphs[0] + + node_names = {node.get_name() for node in graph.get_nodes()} + edge_pairs = { + (edge.get_source(), edge.get_destination()) for edge in graph.get_edges() + } + + expected_nodes = {"0", "1", "2", "3", "4", "5", "6"} + assert expected_nodes.issubset( + node_names + ), f"Expected nodes {expected_nodes} not found." + + expected_edges = { + ("0", "1"), + ("1", "2"), + ("1", "3"), + ("2", "4"), + ("3", "5"), + ("4", "6"), + ("5", "6"), + } + assert expected_edges.issubset( + edge_pairs + ), f"Expected edges {expected_edges} not found." + + compiled_dag.teardown() + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/requirements/test-requirements.txt b/python/requirements/test-requirements.txt index 8b29a6215ffe..b73f554ec524 100644 --- a/python/requirements/test-requirements.txt +++ b/python/requirements/test-requirements.txt @@ -25,6 +25,7 @@ freezegun==1.1.0 google-api-python-client==2.111.0 google-cloud-storage==2.14.0 gradio==3.50.2; platform_system != "Windows" +graphviz==0.20.3 websockets==11.0.3 joblib==1.2.0 jsonpatch==1.32