Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class FluxJobExecutor(BaseExecutor):
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
Expand Down Expand Up @@ -105,6 +106,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
Expand Down Expand Up @@ -152,6 +154,7 @@ def __init__(
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
Expand Down Expand Up @@ -189,6 +192,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
export_workflow_filename=export_workflow_filename,
)
)
else:
Expand Down Expand Up @@ -255,6 +259,7 @@ class FluxClusterExecutor(BaseExecutor):
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
Expand Down Expand Up @@ -293,6 +298,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
Expand Down Expand Up @@ -338,6 +344,7 @@ def __init__(
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
Expand Down Expand Up @@ -420,6 +427,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
export_workflow_filename=export_workflow_filename,
)
)

Expand Down
8 changes: 8 additions & 0 deletions src/executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SingleNodeExecutor(BaseExecutor):
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
Expand Down Expand Up @@ -138,6 +140,7 @@ def __init__(
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
Expand Down Expand Up @@ -171,6 +174,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
export_workflow_filename=export_workflow_filename,
)
)
else:
Expand Down Expand Up @@ -226,6 +230,7 @@ class TestClusterExecutor(BaseExecutor):
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
Expand Down Expand Up @@ -262,6 +267,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
Expand Down Expand Up @@ -299,6 +305,7 @@ def __init__(
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
Expand Down Expand Up @@ -358,6 +365,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
export_workflow_filename=export_workflow_filename,
)
)

Expand Down
8 changes: 8 additions & 0 deletions src/executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class SlurmClusterExecutor(BaseExecutor):
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
Expand Down Expand Up @@ -101,6 +102,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
Expand Down Expand Up @@ -146,6 +148,7 @@ def __init__(
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
Expand Down Expand Up @@ -225,6 +228,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
export_workflow_filename=export_workflow_filename,
)
)

Expand Down Expand Up @@ -275,6 +279,7 @@ class SlurmJobExecutor(BaseExecutor):
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
Expand Down Expand Up @@ -312,6 +317,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
Expand Down Expand Up @@ -360,6 +366,7 @@ def __init__(
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
Expand Down Expand Up @@ -394,6 +401,7 @@ def __init__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
export_workflow_filename=export_workflow_filename,
)
)
else:
Expand Down
23 changes: 17 additions & 6 deletions src/executorlib/task_scheduler/interactive/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.dependency_plot import (
export_dependency_graph_function,
generate_nodes_and_edges_for_plotting,
generate_task_hash_for_plotting,
plot_dependency_graph_function,
Expand All @@ -28,6 +29,7 @@ class DependencyTaskScheduler(TaskSchedulerBase):
refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.

Attributes:
_future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object.
Expand All @@ -44,6 +46,7 @@ def __init__(
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
) -> None:
super().__init__(max_cores=max_cores)
self._process_kwargs = {
Expand All @@ -61,7 +64,8 @@ def __init__(
self._future_hash_dict: dict = {}
self._task_hash_dict: dict = {}
self._plot_dependency_graph_filename = plot_dependency_graph_filename
if plot_dependency_graph_filename is None:
self._export_workflow_filename = export_workflow_filename
if plot_dependency_graph_filename is None and export_workflow_filename is None:
self._generate_dependency_graph = plot_dependency_graph
else:
self._generate_dependency_graph = True
Expand Down Expand Up @@ -209,11 +213,18 @@ def __exit__(
v: k for k, v in self._future_hash_dict.items()
},
)
return plot_dependency_graph_function(
node_lst=node_lst,
edge_lst=edge_lst,
filename=self._plot_dependency_graph_filename,
)
if self._export_workflow_filename is not None:
return export_dependency_graph_function(
node_lst=node_lst,
edge_lst=edge_lst,
file_name=self._export_workflow_filename,
)
else:
return plot_dependency_graph_function(
node_lst=node_lst,
edge_lst=edge_lst,
filename=self._plot_dependency_graph_filename,
)
else:
return None

Expand Down
75 changes: 75 additions & 0 deletions src/executorlib/task_scheduler/interactive/dependency_plot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import inspect
import json
import os.path
from concurrent.futures import Future
from typing import Optional

import cloudpickle
import numpy as np

from executorlib.standalone.select import FutureSelector

Expand Down Expand Up @@ -230,3 +232,76 @@ def plot_dependency_graph_function(
from IPython.display import SVG, display # noqa

display(SVG(nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg")))


def export_dependency_graph_function(
node_lst: list, edge_lst: list, file_name: str = "workflow.json"
):
"""
Export the graph visualization of nodes and edges as a JSON dictionary.

Args:
node_lst (list): List of nodes.
edge_lst (list): List of edges.
file_name (str): Name of the file to store the exported graph in.
"""
pwd_nodes_lst = []
for n in node_lst:
if n["type"] == "function":
pwd_nodes_lst.append(
{"id": n["id"], "type": n["type"], "value": n["value"]}
)
elif n["type"] == "input" and isinstance(n["value"], np.ndarray):
pwd_nodes_lst.append(
{
"id": n["id"],
"type": n["type"],
"value": n["value"].tolist(),
"name": n["name"],
}
)
else:
pwd_nodes_lst.append(
{
"id": n["id"],
"type": n["type"],
"value": n["value"],
"name": n["name"],
}
)

final_node = {"id": len(pwd_nodes_lst), "type": "output", "name": "result"}
pwd_nodes_lst.append(final_node)
pwd_edges_lst = [
(
{
"target": e["end"],
"targetPort": e["label"],
"source": e["start"],
"sourcePort": None,
}
if "start_label" not in e
else {
"target": e["end"],
"targetPort": e["end_label"],
"source": e["start"],
"sourcePort": e["start_label"],
}
)
for e in edge_lst
]
pwd_edges_lst.append(
{
"target": final_node["id"],
"targetPort": None,
"source": max([e["target"] for e in pwd_edges_lst]),
"sourcePort": None,
}
)
Comment on lines +293 to +300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle empty edge list to prevent ValueError on max().

If edge_lst is empty, pwd_edges_lst will be empty, and max([e["target"] for e in pwd_edges_lst]) will raise ValueError: max() arg is an empty sequence. This can occur with single-node workflows or graphs without edges.

🔎 Proposed fix
-    pwd_edges_lst.append(
-        {
-            "target": final_node["id"],
-            "targetPort": None,
-            "source": max([e["target"] for e in pwd_edges_lst]),
-            "sourcePort": None,
-        }
-    )
+    if pwd_edges_lst:
+        pwd_edges_lst.append(
+            {
+                "target": final_node["id"],
+                "targetPort": None,
+                "source": max(e["target"] for e in pwd_edges_lst),
+                "sourcePort": None,
+            }
+        )
🤖 Prompt for AI Agents
In src/executorlib/task_scheduler/interactive/dependency_plot.py around lines
293-300, the code calls max() on pwd_edges_lst which raises ValueError when
pwd_edges_lst is empty; update the logic to handle an empty list by computing
source only if pwd_edges_lst has elements (e.g., source = max(e["target"] for e
in pwd_edges_lst)), otherwise set source to a safe fallback (for example
final_node["id"] or None) or skip adding the edge entirely; implement a short
conditional that selects the fallback and then append the dict using that source
to avoid the max() on an empty sequence.

pwd_dict = {
"version": "0.1.0",
"nodes": pwd_nodes_lst,
"edges": pwd_edges_lst,
}
with open(file_name, "w") as f:
json.dump(pwd_dict, f, indent=4)
47 changes: 47 additions & 0 deletions tests/test_singlenodeexecutor_pwd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import os
import unittest
import numpy as np
from executorlib import SingleNodeExecutor, get_item_from_future


def get_sum(x, y):
return x + y

def get_prod_and_div(x, y):
return {"prod": x * y, "div": x / y}

def get_square(x):
return x ** 2


class TestPythonWorkflowDefinition(unittest.TestCase):
def tearDown(self):
if os.path.exists("workflow.json"):
os.remove("workflow.json")

def test_arithmetic(self):
with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe:
future_prod_and_div = exe.submit(get_prod_and_div, x=1, y=2)
future_prod = get_item_from_future(future_prod_and_div, key="prod")
future_div = get_item_from_future(future_prod_and_div, key="div")
future_sum = exe.submit(get_sum, x=future_prod, y=future_div)
future_result = exe.submit(get_square, x=future_sum)
self.assertIsNone(future_result.result())

with open("workflow.json", "r") as f:
content = json.load(f)

self.assertEqual(len(content["nodes"]), 6)
self.assertEqual(len(content["edges"]), 6)

def test_numpy_array(self):
with SingleNodeExecutor(export_workflow_filename="workflow.json") as exe:
future_sum = exe.submit(get_sum, x=np.array([1,2]), y=np.array([3,4]))
self.assertIsNone(future_sum.result())

with open("workflow.json", "r") as f:
content = json.load(f)

self.assertEqual(len(content["nodes"]), 4)
self.assertEqual(len(content["edges"]), 3)