Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
16a76ff
Implement splitting of future objects
jan-janssen Sep 10, 2025
806330c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 10, 2025
bd0aac7
Update split.py
jan-janssen Sep 10, 2025
68f836f
Update split.py
jan-janssen Sep 10, 2025
c7c3020
fix tests
jan-janssen Sep 10, 2025
b0d783e
extend tests
jan-janssen Sep 10, 2025
f897576
cover all lines
jan-janssen Sep 10, 2025
517a818
Import from API
jan-janssen Sep 10, 2025
7eee927
fixes
jan-janssen Sep 10, 2025
c99499d
Add support for dictionaries
jan-janssen Sep 10, 2025
e248cef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 10, 2025
90bf573
use dict function
jan-janssen Sep 10, 2025
be142a4
move split to main
jan-janssen Sep 10, 2025
1f0b3c4
Add docstrings
jan-janssen Sep 10, 2025
347fbbb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 10, 2025
7a3b20d
Test response is None
jan-janssen Sep 10, 2025
c63acdb
Extend plotting function to support splitfuture objects
jan-janssen Sep 10, 2025
9858b70
Rename splitfuture to futureselector
jan-janssen Sep 10, 2025
afa5e73
Merge remote-tracking branch 'origin/split' into plot_split
jan-janssen Sep 10, 2025
f513ede
sync with latest changes
jan-janssen Sep 10, 2025
1efb966
Rename module to select rather than split
jan-janssen Sep 10, 2025
7513ad9
Merge remote-tracking branch 'origin/split' into plot_split
jan-janssen Sep 10, 2025
596731a
rename module
jan-janssen Sep 10, 2025
b6b0a31
fix plotting
jan-janssen Sep 10, 2025
4911356
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 10, 2025
bdf0dd5
Merge remote-tracking branch 'origin/main' into plot_split
jan-janssen Sep 10, 2025
e97a50e
Merge remote-tracking branch 'origin/main' into split
jan-janssen Sep 10, 2025
99316ee
Merge remote-tracking branch 'origin/split' into plot_split
jan-janssen Sep 10, 2025
11c3417
Add tests
jan-janssen Sep 10, 2025
54d46c8
skip test
jan-janssen Sep 10, 2025
5788382
remove redundant lines
jan-janssen Sep 10, 2025
497bc16
move plot module
jan-janssen Sep 10, 2025
4a41b75
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SlurmClusterExecutor,
SlurmJobExecutor,
)
from executorlib.standalone.select import get_item_from_future, split_future


def get_cache_data(cache_directory: str) -> list[dict]:
Expand Down Expand Up @@ -66,6 +67,8 @@ def terminate_tasks_in_cache(

__all__: list[str] = [
"get_cache_data",
"get_item_from_future",
"split_future",
"terminate_tasks_in_cache",
"BaseExecutor",
"FluxJobExecutor",
Expand Down
72 changes: 72 additions & 0 deletions executorlib/standalone/select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from concurrent.futures import Future
from typing import Any, Optional


class FutureSelector(Future):
def __init__(self, future: Future, selector: int | str):
super().__init__()
self._future = future
self._selector = selector

def cancel(self) -> bool:
return self._future.cancel()

def cancelled(self) -> bool:
return self._future.cancelled()

def running(self) -> bool:
return self._future.running()

def done(self) -> bool:
return self._future.done()

def add_done_callback(self, fn) -> None:
return self._future.add_done_callback(fn=fn)

def result(self, timeout: Optional[float] = None) -> Any:
result = self._future.result(timeout=timeout)
if result is not None:
return result[self._selector]
else:
return None

def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]:
return self._future.exception(timeout=timeout)

def set_running_or_notify_cancel(self) -> bool:
return self._future.set_running_or_notify_cancel()

def set_result(self, result: Any) -> None:
return self._future.set_result(result=result)

def set_exception(self, exception: Optional[BaseException]) -> None:
return self._future.set_exception(exception=exception)


def split_future(future: Future, n: int) -> list[FutureSelector]:
"""
Split a concurrent.futures.Future object which returns a tuple or list as result into individual future objects

Args:
future (Future): future object which returns a tuple or list as result
n: number of elements expected in the future object

Returns:
list: List of future objects
"""
return [FutureSelector(future=future, selector=i) for i in range(n)]


def get_item_from_future(future: Future, key: str) -> FutureSelector:
"""
Get item from concurrent.futures.Future object which returns a dictionary as result by the corresponding dictionary
key.

Args:
future (Future): future object which returns a dictionary as result
key (str): dictionary key to get item from dictionary

Returns:
FutureSelector: Future object which returns the value corresponding to the key
"""
return FutureSelector(future=future, selector=key)
4 changes: 2 additions & 2 deletions executorlib/task_scheduler/interactive/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
get_future_objects_from_input,
update_futures_in_input,
)
from executorlib.standalone.plot import (
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.dependency_plot import (
generate_nodes_and_edges_for_plotting,
generate_task_hash_for_plotting,
plot_dependency_graph_function,
)
from executorlib.task_scheduler.base import TaskSchedulerBase


class DependencyTaskScheduler(TaskSchedulerBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import cloudpickle

from executorlib.standalone.select import FutureSelector


def generate_nodes_and_edges_for_plotting(
task_hash_dict: dict, future_hash_inverse_dict: dict
Expand Down Expand Up @@ -31,7 +33,15 @@ def add_element(arg, link_to, label=""):
link_to: ID of the node to link the element to.
label (str, optional): Label for the edge. Defaults to "".
"""
if isinstance(arg, Future):
if isinstance(arg, FutureSelector):
edge_lst.append(
{
"start": hash_id_dict[future_hash_inverse_dict[arg._future]],
"end": link_to,
"label": label + str(arg._selector),
}
)
elif isinstance(arg, Future):
edge_lst.append(
{
"start": hash_id_dict[future_hash_inverse_dict[arg]],
Expand Down Expand Up @@ -104,7 +114,24 @@ def convert_arg(arg, future_hash_inverse_dict):
Returns:
The hash representation of the argument.
"""
if isinstance(arg, Future):
if isinstance(arg, FutureSelector):
if arg not in future_hash_inverse_dict:
obj_dict = {
"args": (),
"kwargs": {
"future": future_hash_inverse_dict[arg._future],
"selector": arg._selector,
},
}
if isinstance(arg._selector, str):
obj_dict["fn"] = "get_item_from_future"
else:
obj_dict["fn"] = "split_future"
arg_hash = cloudpickle.dumps(obj_dict)
future_hash_dict[arg_hash] = arg
future_hash_inverse_dict[arg] = arg_hash
return future_hash_inverse_dict[arg]
elif isinstance(arg, Future):
return future_hash_inverse_dict[arg]
elif isinstance(arg, list):
return [
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fluxjobexecutor_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from time import sleep

from executorlib import FluxJobExecutor, FluxClusterExecutor
from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting
from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting
from executorlib.standalone.serialize import cloudpickle_register


Expand Down
40 changes: 39 additions & 1 deletion tests/test_singlenodeexecutor_plot_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
SingleNodeExecutor,
SlurmJobExecutor,
SlurmClusterExecutor,
split_future,
get_item_from_future,
)
from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting
from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting
from executorlib.standalone.serialize import cloudpickle_register


Expand Down Expand Up @@ -43,6 +45,17 @@ def return_input_dict(input_dict):
return input_dict


def get_tuple(i):
return 1, 2, i


def get_dict(i):
return {"a": 1, "b": i}


def echo(i):
return i

@unittest.skipIf(
skip_graphviz_test,
"graphviz is not installed, so the plot_dependency_graph tests are skipped.",
Expand Down Expand Up @@ -289,3 +302,28 @@ def test_many_to_one_plot(self):
)
self.assertEqual(len(nodes), 19)
self.assertEqual(len(edges), 22)


@unittest.skipIf(
skip_graphviz_test,
"graphviz is not installed, so the plot_dependency_graph tests are skipped.",
)
class TestSelectExecutorPlot(unittest.TestCase):
def test_split_future(self):
with SingleNodeExecutor(plot_dependency_graph=True) as exe:
f = exe.submit(get_tuple, 5)
f1, f2, f3 = split_future(future=f, n=3)
f12 = exe.submit(echo, f1)
f22 = exe.submit(echo, f2)
f32 = exe.submit(echo, f3)
self.assertIsNone(f12.result())
self.assertIsNone(f22.result())
self.assertIsNone(f32.result())

def test_get_item_from_future(self):
with SingleNodeExecutor(plot_dependency_graph=True) as exe:
f = exe.submit(get_dict, 5)
f1 = exe.submit(echo, get_item_from_future(future=f, key="a"))
f2 = exe.submit(echo, get_item_from_future(future=f, key="b"))
self.assertIsNone(f1.result())
self.assertIsNone(f2.result())
82 changes: 82 additions & 0 deletions tests/test_standalone_select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import unittest
from concurrent.futures import Future
from executorlib import SingleNodeExecutor, split_future, get_item_from_future
from executorlib.api import cloudpickle_register
from executorlib.standalone.select import FutureSelector


def function_returns_tuple(i):
return "a", "b", i


def function_returns_dict(i):
return {"a": 1, "b": 2, "c": i}


def function_with_exception(i):
raise RuntimeError()


def callback(future):
print("callback:", future.result())


class TestSplitFuture(unittest.TestCase):
def test_integration_return_tuple(self):
with SingleNodeExecutor() as exe:
cloudpickle_register(ind=1)
future = exe.submit(function_returns_tuple, 15)
f1, f2, f3 = split_future(future=future, n=3)
self.assertEqual(f1.result(), "a")
self.assertEqual(f2.result(), "b")
self.assertEqual(f3.result(), 15)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertTrue(f3.done())

def test_integration_return_dict(self):
with SingleNodeExecutor() as exe:
cloudpickle_register(ind=1)
future = exe.submit(function_returns_dict, 15)
f1 = get_item_from_future(future=future, key="a")
f2 = get_item_from_future(future=future, key="b")
f3 = get_item_from_future(future=future, key="c")
self.assertEqual(f1.result(), 1)
self.assertEqual(f2.result(), 2)
self.assertEqual(f3.result(), 15)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertTrue(f3.done())

def test_integration_exception(self):
with SingleNodeExecutor() as exe:
cloudpickle_register(ind=1)
future = exe.submit(function_with_exception, 15)
f1, f2, f3 = split_future(future=future, n=3)
with self.assertRaises(RuntimeError):
f3.result()

def test_split_future_object(self):
f1 = Future()
fs1 = FutureSelector(future=f1, selector=1)
fs1.add_done_callback(callback)
fs1.set_running_or_notify_cancel()
self.assertTrue(fs1.running())
fs1.set_result([1, 2])
self.assertEqual(fs1.result(), 2)
f2 = Future()
fs2 = FutureSelector(future=f2, selector=1)
fs2.cancel()
self.assertTrue(fs2.cancelled())
f3 = Future()
fs3 = FutureSelector(future=f3, selector=1)
fs3.set_running_or_notify_cancel()
self.assertTrue(fs3.running())
fs3.set_result(None)
self.assertEqual(fs3.result(), None)
f4 = Future()
fs4 = FutureSelector(future=f4, selector=1)
fs4.set_exception(RuntimeError())
self.assertEqual(type(fs4.exception()), RuntimeError)
with self.assertRaises(RuntimeError):
fs4.result()
2 changes: 1 addition & 1 deletion tests/test_testclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from executorlib import get_cache_data
from executorlib.api import TestClusterExecutor
from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting
from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting
from executorlib.standalone.serialize import cloudpickle_register

try:
Expand Down
Loading