From 16a76ffd085a30065d6befbc84097583b1656b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 07:50:57 +0200 Subject: [PATCH 01/28] Implement splitting of future objects --- executorlib/standalone/split.py | 43 +++++++++++++++++++++++++++++++++ tests/test_standalone_split.py | 31 ++++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 executorlib/standalone/split.py create mode 100644 tests/test_standalone_split.py diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py new file mode 100644 index 00000000..9e34d057 --- /dev/null +++ b/executorlib/standalone/split.py @@ -0,0 +1,43 @@ +from concurrent.futures import Future +from typing import Any, Optional + + +class SplitFuture(Future): + def __init__(self, future: Future, selector: int): + super().__init__() + self._future = future + self._selector = selector + + def cancel(self) -> bool: + return self._future.cancel() + + def cancelled(self) -> bool: + return self._future.cancelled() + + def running(self) -> bool: + return self._future.running() + + def done(self) -> bool: + return self._future.done() + + def add_done_callback(self, fn) -> None: + return self._future.add_done_callback(fn=fn) + + def result(self, timeout: Optional[int]=None) -> Any: + return self._future.result(timeout=timeout)[self._selector] + + def exception(self, timeout: Optional[int]=None) -> BaseException: + return self._future.exception(timeout=timeout) + + def set_running_or_notify_cancel(self) -> bool: + return self._future.set_running_or_notify_cancel() + + def set_result(self, result) -> None: + return self._future.set_result(result=result) + + def set_exception(self, exception) -> None: + return self._future.set_exception(exception=exception) + + +def split(future: Future, n: int): + return [SplitFuture(future=future, selector=i) for i in range(n)] diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py new file mode 100644 index 00000000..bee75a0c --- /dev/null +++ b/tests/test_standalone_split.py @@ -0,0 +1,31 @@ +import unittest +from executorlib import SingleNodeExecutor +from executorlib.standalone.split import split + + +def function_with_multiple_outputs(i): + return "a", "b", i + + +def function_with_exception(i): + raise RuntimeError() + + +class TestSplitFuture(unittest.TestCase): + def test_integration_base(self): + with SingleNodeExecutor() as exe: + future = exe.submit(function_with_multiple_outputs, 15) + f1, f2, f3 = split(future=future, n=3) + self.assertEqual(f1.result(), "a") + self.assertEqual(f2.result(), "b") + self.assertEqual(f3.result(), 15) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertTrue(f3.done()) + + def test_integration_exception(self): + with SingleNodeExecutor() as exe: + future = exe.submit(function_with_exception, 15) + f1, f2, f3 = split(future=future, n=3) + with self.assertRaises(RuntimeError): + f3.result() \ No newline at end of file From 806330c25b538c64bfee88bb01523160f09f8409 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 05:51:29 +0000 Subject: [PATCH 02/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/split.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 9e34d057..623a05bd 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -23,10 +23,10 @@ def done(self) -> bool: def add_done_callback(self, fn) -> None: return self._future.add_done_callback(fn=fn) - def result(self, timeout: Optional[int]=None) -> Any: + def result(self, timeout: Optional[int] = None) -> Any: return self._future.result(timeout=timeout)[self._selector] - def exception(self, timeout: Optional[int]=None) -> BaseException: + def exception(self, timeout: Optional[int] = None) -> BaseException: return self._future.exception(timeout=timeout) def set_running_or_notify_cancel(self) -> bool: From bd0aac70ebe7c690350017a23e8a13514fb95382 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 07:53:30 +0200 Subject: [PATCH 03/28] Update split.py --- executorlib/standalone/split.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 623a05bd..936ecb1b 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -23,19 +23,19 @@ def done(self) -> bool: def add_done_callback(self, fn) -> None: return self._future.add_done_callback(fn=fn) - def result(self, timeout: Optional[int] = None) -> Any: + def result(self, timeout: Optional[float] = None) -> Any: return self._future.result(timeout=timeout)[self._selector] - def exception(self, timeout: Optional[int] = None) -> BaseException: + def exception(self, timeout: Optional[float] = None) -> BaseException: return self._future.exception(timeout=timeout) def set_running_or_notify_cancel(self) -> bool: return self._future.set_running_or_notify_cancel() - def set_result(self, result) -> None: + def set_result(self, result: Any) -> None: return self._future.set_result(result=result) - def set_exception(self, exception) -> None: + def set_exception(self, exception: BaseException) -> None: return self._future.set_exception(exception=exception) From 68f836f9901c27fb36c00c89572a998f4eb00560 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 07:57:42 +0200 Subject: [PATCH 04/28] Update split.py --- executorlib/standalone/split.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 936ecb1b..2241b81e 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -26,7 +26,7 @@ def add_done_callback(self, fn) -> None: def result(self, timeout: Optional[float] = None) -> Any: return self._future.result(timeout=timeout)[self._selector] - def exception(self, timeout: Optional[float] = None) -> BaseException: + def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]: return self._future.exception(timeout=timeout) def set_running_or_notify_cancel(self) -> bool: @@ -35,7 +35,7 @@ def set_running_or_notify_cancel(self) -> bool: def set_result(self, result: Any) -> None: return self._future.set_result(result=result) - def set_exception(self, exception: BaseException) -> None: + def set_exception(self, exception: Optional[BaseException]) -> None: return self._future.set_exception(exception=exception) From c7c30204436afe285c15ee055fa4646e05d01425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 08:02:28 +0200 Subject: [PATCH 05/28] fix tests --- tests/test_standalone_split.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index bee75a0c..91ae1487 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -1,5 +1,6 @@ import unittest from executorlib import SingleNodeExecutor +from executorlib.api import cloudpickle_register from executorlib.standalone.split import split @@ -14,6 +15,7 @@ def function_with_exception(i): class TestSplitFuture(unittest.TestCase): def test_integration_base(self): with SingleNodeExecutor() as exe: + cloudpickle_register(ind=1) future = exe.submit(function_with_multiple_outputs, 15) f1, f2, f3 = split(future=future, n=3) self.assertEqual(f1.result(), "a") @@ -25,6 +27,7 @@ def test_integration_base(self): def test_integration_exception(self): with SingleNodeExecutor() as exe: + cloudpickle_register(ind=1) future = exe.submit(function_with_exception, 15) f1, f2, f3 = split(future=future, n=3) with self.assertRaises(RuntimeError): From b0d783eaaa1f010562f9921e0e6a83501f5ca6e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 08:15:02 +0200 Subject: [PATCH 06/28] extend tests --- tests/test_standalone_split.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index 91ae1487..0afe1761 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -1,7 +1,8 @@ import unittest +from concurrent.futures import Future from executorlib import SingleNodeExecutor from executorlib.api import cloudpickle_register -from executorlib.standalone.split import split +from executorlib.standalone.split import split, SplitFuture def function_with_multiple_outputs(i): @@ -31,4 +32,21 @@ def test_integration_exception(self): future = exe.submit(function_with_exception, 15) f1, f2, f3 = split(future=future, n=3) with self.assertRaises(RuntimeError): - f3.result() \ No newline at end of file + f3.result() + + def test_split_future_object(self): + f1 = Future() + fs1 = SplitFuture(future=f1, selector=1) + fs1.set_running_or_notify_cancel() + self.assertTrue(fs1.running()) + fs1.set_result([1, 2]) + self.assertEqual(fs1.result(), 2) + f2 = Future() + fs2 = SplitFuture(future=f2, selector=1) + fs2.cancel() + self.assertTrue(fs2.cancelled()) + f3 = Future() + fs3 = SplitFuture(future=f3, selector=1) + fs3.set_exception(RuntimeError()) + with self.assertRaises(RuntimeError): + fs3.result() From f89757617c107d697c6678664029cc199ce9afcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 08:25:17 +0200 Subject: [PATCH 07/28] cover all lines --- tests/test_standalone_split.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index 0afe1761..4a1711d1 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -13,6 +13,10 @@ def function_with_exception(i): raise RuntimeError() +def callback(future): + print("callback:", future.result()) + + class TestSplitFuture(unittest.TestCase): def test_integration_base(self): with SingleNodeExecutor() as exe: @@ -37,6 +41,7 @@ def test_integration_exception(self): def test_split_future_object(self): f1 = Future() fs1 = SplitFuture(future=f1, selector=1) + fs1.add_done_callback(callback) fs1.set_running_or_notify_cancel() self.assertTrue(fs1.running()) fs1.set_result([1, 2]) @@ -48,5 +53,6 @@ def test_split_future_object(self): f3 = Future() fs3 = SplitFuture(future=f3, selector=1) fs3.set_exception(RuntimeError()) + self.assertEqual(type(fs3.exception()), RuntimeError) with self.assertRaises(RuntimeError): fs3.result() From 517a8187a0ae36b6028115cf64c7ee6a771e700e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 08:26:22 +0200 Subject: [PATCH 08/28] Import from API --- executorlib/api.py | 2 ++ tests/test_standalone_split.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/executorlib/api.py b/executorlib/api.py index 9e9b0941..800e8e64 100644 --- a/executorlib/api.py +++ b/executorlib/api.py @@ -18,6 +18,7 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner, SubprocessSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.standalone.serialize import cloudpickle_register +from executorlib.standalone.split import split __all__: list[str] = [ "TestClusterExecutor", @@ -29,6 +30,7 @@ "interface_receive", "interface_send", "interface_shutdown", + "split", "MpiExecSpawner", "SocketInterface", "SubprocessSpawner", diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index 4a1711d1..9ee289f1 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -1,8 +1,8 @@ import unittest from concurrent.futures import Future from executorlib import SingleNodeExecutor -from executorlib.api import cloudpickle_register -from executorlib.standalone.split import split, SplitFuture +from executorlib.api import cloudpickle_register, split +from executorlib.standalone.split import SplitFuture def function_with_multiple_outputs(i): From 7eee92728669fc938ca8413d247bdc8e8b1f787e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 09:01:00 +0200 Subject: [PATCH 09/28] fixes --- executorlib/standalone/split.py | 2 +- tests/test_standalone_split.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 2241b81e..dcfa648f 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -3,7 +3,7 @@ class SplitFuture(Future): - def __init__(self, future: Future, selector: int): + def __init__(self, future: Future, selector: int | str): super().__init__() self._future = future self._selector = selector diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index 9ee289f1..aa42caf1 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -5,10 +5,14 @@ from executorlib.standalone.split import SplitFuture -def function_with_multiple_outputs(i): +def function_returns_tuple(i): return "a", "b", i +def function_returns_dict(i): + return {"a": 1, "b": 2, "c": i} + + def function_with_exception(i): raise RuntimeError() @@ -18,10 +22,10 @@ def callback(future): class TestSplitFuture(unittest.TestCase): - def test_integration_base(self): + def test_integration_return_tuple(self): with SingleNodeExecutor() as exe: cloudpickle_register(ind=1) - future = exe.submit(function_with_multiple_outputs, 15) + future = exe.submit(function_returns_tuple, 15) f1, f2, f3 = split(future=future, n=3) self.assertEqual(f1.result(), "a") self.assertEqual(f2.result(), "b") From c99499dc36b66ed200682fec710aea3b48d4f76e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 10:09:36 +0200 Subject: [PATCH 10/28] Add support for dictionaries --- executorlib/api.py | 5 +++-- executorlib/standalone/split.py | 6 +++++- tests/test_standalone_split.py | 20 +++++++++++++++++--- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/executorlib/api.py b/executorlib/api.py index 800e8e64..d2f37aca 100644 --- a/executorlib/api.py +++ b/executorlib/api.py @@ -18,19 +18,20 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner, SubprocessSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.standalone.serialize import cloudpickle_register -from executorlib.standalone.split import split +from executorlib.standalone.split import split_tuple, get_item_from_future __all__: list[str] = [ "TestClusterExecutor", "cancel_items_in_queue", "cloudpickle_register", "get_command_path", + "get_item_from_future", "interface_bootup", "interface_connect", "interface_receive", "interface_send", "interface_shutdown", - "split", + "split_tuple", "MpiExecSpawner", "SocketInterface", "SubprocessSpawner", diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index dcfa648f..6d31c60f 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -39,5 +39,9 @@ def set_exception(self, exception: Optional[BaseException]) -> None: return self._future.set_exception(exception=exception) -def split(future: Future, n: int): +def split_tuple(future: Future, n: int) -> list[SplitFuture]: return [SplitFuture(future=future, selector=i) for i in range(n)] + + +def get_item_from_future(future: Future, key: str) -> SplitFuture: + return SplitFuture(future=future, selector=key) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index aa42caf1..b7a982a6 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -1,7 +1,7 @@ import unittest from concurrent.futures import Future from executorlib import SingleNodeExecutor -from executorlib.api import cloudpickle_register, split +from executorlib.api import cloudpickle_register, split_tuple, get_item_from_future from executorlib.standalone.split import SplitFuture @@ -26,7 +26,7 @@ def test_integration_return_tuple(self): with SingleNodeExecutor() as exe: cloudpickle_register(ind=1) future = exe.submit(function_returns_tuple, 15) - f1, f2, f3 = split(future=future, n=3) + f1, f2, f3 = split_tuple(future=future, n=3) self.assertEqual(f1.result(), "a") self.assertEqual(f2.result(), "b") self.assertEqual(f3.result(), 15) @@ -34,11 +34,25 @@ def test_integration_return_tuple(self): self.assertTrue(f2.done()) self.assertTrue(f3.done()) + def test_integration_return_dict(self): + with SingleNodeExecutor() as exe: + cloudpickle_register(ind=1) + future = exe.submit(function_returns_tuple, 15) + f1 = get_item_from_future(future=future, key="a") + f2 = get_item_from_future(future=future, key="b") + f3 = get_item_from_future(future=future, key="c") + self.assertEqual(f1.result(), 1) + self.assertEqual(f2.result(), 2) + self.assertEqual(f3.result(), 15) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertTrue(f3.done()) + def test_integration_exception(self): with SingleNodeExecutor() as exe: cloudpickle_register(ind=1) future = exe.submit(function_with_exception, 15) - f1, f2, f3 = split(future=future, n=3) + f1, f2, f3 = split_tuple(future=future, n=3) with self.assertRaises(RuntimeError): f3.result() From e248cef49c9d02c8f021dc965cedabe7f1bb8a85 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 08:09:44 +0000 Subject: [PATCH 11/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/api.py b/executorlib/api.py index d2f37aca..e8d6a0f9 100644 --- a/executorlib/api.py +++ b/executorlib/api.py @@ -18,7 +18,7 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner, SubprocessSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.standalone.serialize import cloudpickle_register -from executorlib.standalone.split import split_tuple, get_item_from_future +from executorlib.standalone.split import get_item_from_future, split_tuple __all__: list[str] = [ "TestClusterExecutor", From 90bf573f6c16bf0ce47d36a51e8419e97de3581a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 10:14:50 +0200 Subject: [PATCH 12/28] use dict function --- tests/test_standalone_split.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index b7a982a6..3935abd7 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -37,7 +37,7 @@ def test_integration_return_tuple(self): def test_integration_return_dict(self): with SingleNodeExecutor() as exe: cloudpickle_register(ind=1) - future = exe.submit(function_returns_tuple, 15) + future = exe.submit(function_returns_dict, 15) f1 = get_item_from_future(future=future, key="a") f2 = get_item_from_future(future=future, key="b") f3 = get_item_from_future(future=future, key="c") From be142a4ae409d2f3609af926e97418c58b66ef26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 12:01:25 +0200 Subject: [PATCH 13/28] move split to main --- executorlib/__init__.py | 3 +++ executorlib/api.py | 3 --- executorlib/standalone/split.py | 2 +- tests/test_standalone_split.py | 8 ++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index b04f12d7..60761d21 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -25,6 +25,7 @@ SlurmClusterExecutor, SlurmJobExecutor, ) +from executorlib.standalone.split import split_future, get_item_from_future def get_cache_data(cache_directory: str) -> list[dict]: @@ -66,6 +67,8 @@ def terminate_tasks_in_cache( __all__: list[str] = [ "get_cache_data", + "get_item_from_future", + "split_future", "terminate_tasks_in_cache", "BaseExecutor", "FluxJobExecutor", diff --git a/executorlib/api.py b/executorlib/api.py index e8d6a0f9..9e9b0941 100644 --- a/executorlib/api.py +++ b/executorlib/api.py @@ -18,20 +18,17 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner, SubprocessSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.standalone.serialize import cloudpickle_register -from executorlib.standalone.split import get_item_from_future, split_tuple __all__: list[str] = [ "TestClusterExecutor", "cancel_items_in_queue", "cloudpickle_register", "get_command_path", - "get_item_from_future", "interface_bootup", "interface_connect", "interface_receive", "interface_send", "interface_shutdown", - "split_tuple", "MpiExecSpawner", "SocketInterface", "SubprocessSpawner", diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 6d31c60f..4477927e 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -39,7 +39,7 @@ def set_exception(self, exception: Optional[BaseException]) -> None: return self._future.set_exception(exception=exception) -def split_tuple(future: Future, n: int) -> list[SplitFuture]: +def split_future(future: Future, n: int) -> list[SplitFuture]: return [SplitFuture(future=future, selector=i) for i in range(n)] diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index 3935abd7..772c30f4 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -1,7 +1,7 @@ import unittest from concurrent.futures import Future -from executorlib import SingleNodeExecutor -from executorlib.api import cloudpickle_register, split_tuple, get_item_from_future +from executorlib import SingleNodeExecutor, split_future, get_item_from_future +from executorlib.api import cloudpickle_register from executorlib.standalone.split import SplitFuture @@ -26,7 +26,7 @@ def test_integration_return_tuple(self): with SingleNodeExecutor() as exe: cloudpickle_register(ind=1) future = exe.submit(function_returns_tuple, 15) - f1, f2, f3 = split_tuple(future=future, n=3) + f1, f2, f3 = split_future(future=future, n=3) self.assertEqual(f1.result(), "a") self.assertEqual(f2.result(), "b") self.assertEqual(f3.result(), 15) @@ -52,7 +52,7 @@ def test_integration_exception(self): with SingleNodeExecutor() as exe: cloudpickle_register(ind=1) future = exe.submit(function_with_exception, 15) - f1, f2, f3 = split_tuple(future=future, n=3) + f1, f2, f3 = split_future(future=future, n=3) with self.assertRaises(RuntimeError): f3.result() From 1f0b3c4689ea3ef77bc9c31612f9b76faae70f74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 12:05:28 +0200 Subject: [PATCH 14/28] Add docstrings --- executorlib/standalone/split.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 4477927e..23924fae 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -40,8 +40,29 @@ def set_exception(self, exception: Optional[BaseException]) -> None: def split_future(future: Future, n: int) -> list[SplitFuture]: + """ + Split a concurrent.futures.Future object which returns a tuple or list as result into individual future objects + + Args: + future (Future): future object which returns a tuple or list as result + n: number of elements expected in the future object + + Returns: + list: List of future objects + """ return [SplitFuture(future=future, selector=i) for i in range(n)] def get_item_from_future(future: Future, key: str) -> SplitFuture: + """ + Get item from concurrent.futures.Future object which returns a dictionary as result by the corresponding dictionary + key. + + Args: + future (Future): future object which returns a dictionary as result + key (str): dictionary key to get item from dictionary + + Returns: + SplitFuture: Future object which returns the value corresponding to the key + """ return SplitFuture(future=future, selector=key) From 347fbbb2d40d5889ebcc565cc95e3abc562819b6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 10:25:55 +0000 Subject: [PATCH 15/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 60761d21..f7be143c 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -25,7 +25,7 @@ SlurmClusterExecutor, SlurmJobExecutor, ) -from executorlib.standalone.split import split_future, get_item_from_future +from executorlib.standalone.split import get_item_from_future, split_future def get_cache_data(cache_directory: str) -> list[dict]: From 7a3b20dac5427082441b8a24fce543c452f2a08c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 13:33:36 +0200 Subject: [PATCH 16/28] Test response is None --- executorlib/standalone/split.py | 6 +++++- tests/test_standalone_split.py | 12 +++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 23924fae..73d17329 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -24,7 +24,11 @@ def add_done_callback(self, fn) -> None: return self._future.add_done_callback(fn=fn) def result(self, timeout: Optional[float] = None) -> Any: - return self._future.result(timeout=timeout)[self._selector] + result = self._future.result(timeout=timeout) + if result is not None: + return result[self._selector] + else: + return None def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]: return self._future.exception(timeout=timeout) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index 772c30f4..f41f39fa 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -70,7 +70,13 @@ def test_split_future_object(self): self.assertTrue(fs2.cancelled()) f3 = Future() fs3 = SplitFuture(future=f3, selector=1) - fs3.set_exception(RuntimeError()) - self.assertEqual(type(fs3.exception()), RuntimeError) + fs3.set_running_or_notify_cancel() + self.assertTrue(fs3.running()) + fs3.set_result(None) + self.assertEqual(fs3.result(), None) + f4 = Future() + fs4 = SplitFuture(future=f4, selector=1) + fs4.set_exception(RuntimeError()) + self.assertEqual(type(fs4.exception()), RuntimeError) with self.assertRaises(RuntimeError): - fs3.result() + fs4.result() From c63acdbb7bfd8c1d671f32aad25770dc61e579a9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 13:54:43 +0200 Subject: [PATCH 17/28] Extend plotting function to support splitfuture objects --- executorlib/standalone/plot.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index 610bfcff..21620b0c 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -4,6 +4,8 @@ import cloudpickle +from executorlib.standalone.split import SplitFuture + def generate_nodes_and_edges_for_plotting( task_hash_dict: dict, future_hash_inverse_dict: dict @@ -31,7 +33,15 @@ def add_element(arg, link_to, label=""): link_to: ID of the node to link the element to. label (str, optional): Label for the edge. Defaults to "". """ - if isinstance(arg, Future): + if isinstance(arg, SplitFuture): + edge_lst.append( + { + "start": hash_id_dict[future_hash_inverse_dict[arg._future]], + "end": link_to, + "label": label + str(arg._selector), + } + ) + elif isinstance(arg, Future): edge_lst.append( { "start": hash_id_dict[future_hash_inverse_dict[arg]], @@ -106,7 +116,9 @@ def convert_arg(arg, future_hash_inverse_dict): Returns: The hash representation of the argument. """ - if isinstance(arg, Future): + if isinstance(arg, SplitFuture): + return future_hash_inverse_dict[arg._future] + elif isinstance(arg, Future): return future_hash_inverse_dict[arg] elif isinstance(arg, list): return [ From 9858b701c2cbcf49439620461c7d003c59e7856a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 13:55:55 +0200 Subject: [PATCH 18/28] Rename splitfuture to futureselector --- executorlib/standalone/split.py | 12 ++++++------ tests/test_standalone_split.py | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/executorlib/standalone/split.py b/executorlib/standalone/split.py index 73d17329..b66c3d70 100644 --- a/executorlib/standalone/split.py +++ b/executorlib/standalone/split.py @@ -2,7 +2,7 @@ from typing import Any, Optional -class SplitFuture(Future): +class FutureSelector(Future): def __init__(self, future: Future, selector: int | str): super().__init__() self._future = future @@ -43,7 +43,7 @@ def set_exception(self, exception: Optional[BaseException]) -> None: return self._future.set_exception(exception=exception) -def split_future(future: Future, n: int) -> list[SplitFuture]: +def split_future(future: Future, n: int) -> list[FutureSelector]: """ Split a concurrent.futures.Future object which returns a tuple or list as result into individual future objects @@ -54,10 +54,10 @@ def split_future(future: Future, n: int) -> list[SplitFuture]: Returns: list: List of future objects """ - return [SplitFuture(future=future, selector=i) for i in range(n)] + return [FutureSelector(future=future, selector=i) for i in range(n)] -def get_item_from_future(future: Future, key: str) -> SplitFuture: +def get_item_from_future(future: Future, key: str) -> FutureSelector: """ Get item from concurrent.futures.Future object which returns a dictionary as result by the corresponding dictionary key. @@ -67,6 +67,6 @@ def get_item_from_future(future: Future, key: str) -> SplitFuture: key (str): dictionary key to get item from dictionary Returns: - SplitFuture: Future object which returns the value corresponding to the key + FutureSelector: Future object which returns the value corresponding to the key """ - return SplitFuture(future=future, selector=key) + return FutureSelector(future=future, selector=key) diff --git a/tests/test_standalone_split.py b/tests/test_standalone_split.py index f41f39fa..0ef6ddd0 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_split.py @@ -2,7 +2,7 @@ from concurrent.futures import Future from executorlib import SingleNodeExecutor, split_future, get_item_from_future from executorlib.api import cloudpickle_register -from executorlib.standalone.split import SplitFuture +from executorlib.standalone.split import FutureSelector def function_returns_tuple(i): @@ -58,24 +58,24 @@ def test_integration_exception(self): def test_split_future_object(self): f1 = Future() - fs1 = SplitFuture(future=f1, selector=1) + fs1 = FutureSelector(future=f1, selector=1) fs1.add_done_callback(callback) fs1.set_running_or_notify_cancel() self.assertTrue(fs1.running()) fs1.set_result([1, 2]) self.assertEqual(fs1.result(), 2) f2 = Future() - fs2 = SplitFuture(future=f2, selector=1) + fs2 = FutureSelector(future=f2, selector=1) fs2.cancel() self.assertTrue(fs2.cancelled()) f3 = Future() - fs3 = SplitFuture(future=f3, selector=1) + fs3 = FutureSelector(future=f3, selector=1) fs3.set_running_or_notify_cancel() self.assertTrue(fs3.running()) fs3.set_result(None) self.assertEqual(fs3.result(), None) f4 = Future() - fs4 = SplitFuture(future=f4, selector=1) + fs4 = FutureSelector(future=f4, selector=1) fs4.set_exception(RuntimeError()) self.assertEqual(type(fs4.exception()), RuntimeError) with self.assertRaises(RuntimeError): From f513ede84011c9c2aace858e1d6b169713280126 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 13:57:05 +0200 Subject: [PATCH 19/28] sync with latest changes --- executorlib/standalone/plot.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index 21620b0c..6e0b430d 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -4,7 +4,7 @@ import cloudpickle -from executorlib.standalone.split import SplitFuture +from executorlib.standalone.split import FutureSelector def generate_nodes_and_edges_for_plotting( @@ -33,7 +33,7 @@ def add_element(arg, link_to, label=""): link_to: ID of the node to link the element to. label (str, optional): Label for the edge. Defaults to "". """ - if isinstance(arg, SplitFuture): + if isinstance(arg, FutureSelector): edge_lst.append( { "start": hash_id_dict[future_hash_inverse_dict[arg._future]], @@ -116,7 +116,7 @@ def convert_arg(arg, future_hash_inverse_dict): Returns: The hash representation of the argument. """ - if isinstance(arg, SplitFuture): + if isinstance(arg, FutureSelector): return future_hash_inverse_dict[arg._future] elif isinstance(arg, Future): return future_hash_inverse_dict[arg] From 1efb966ee4f7d9b21a35f312870f993231bbb157 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 13:58:38 +0200 Subject: [PATCH 20/28] Rename module to select rather than split --- executorlib/__init__.py | 2 +- executorlib/standalone/{split.py => select.py} | 0 tests/{test_standalone_split.py => test_standalone_select.py} | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename executorlib/standalone/{split.py => select.py} (100%) rename tests/{test_standalone_split.py => test_standalone_select.py} (98%) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index f7be143c..337b6d80 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -25,7 +25,7 @@ SlurmClusterExecutor, SlurmJobExecutor, ) -from executorlib.standalone.split import get_item_from_future, split_future +from executorlib.standalone.select import get_item_from_future, split_future def get_cache_data(cache_directory: str) -> list[dict]: diff --git a/executorlib/standalone/split.py b/executorlib/standalone/select.py similarity index 100% rename from executorlib/standalone/split.py rename to executorlib/standalone/select.py diff --git a/tests/test_standalone_split.py b/tests/test_standalone_select.py similarity index 98% rename from tests/test_standalone_split.py rename to tests/test_standalone_select.py index 0ef6ddd0..f6cc365b 100644 --- a/tests/test_standalone_split.py +++ b/tests/test_standalone_select.py @@ -2,7 +2,7 @@ from concurrent.futures import Future from executorlib import SingleNodeExecutor, split_future, get_item_from_future from executorlib.api import cloudpickle_register -from executorlib.standalone.split import FutureSelector +from executorlib.standalone.select import FutureSelector def function_returns_tuple(i): From 596731aefb7d6257b945b0e98d459ec2aab28dfa Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 10 Sep 2025 13:59:41 +0200 Subject: [PATCH 21/28] rename module --- executorlib/standalone/plot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index 6e0b430d..1402fe4d 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -4,7 +4,7 @@ import cloudpickle -from executorlib.standalone.split import FutureSelector +from executorlib.standalone.select import FutureSelector def generate_nodes_and_edges_for_plotting( From b6b0a318a829b4f45b6b72ce9f998a68d9f147e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 20:41:44 +0200 Subject: [PATCH 22/28] fix plotting --- executorlib/standalone/plot.py | 32 +++++++++++++++++-- .../task_scheduler/interactive/dependency.py | 5 ++- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index 1402fe4d..c76ea1f6 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -92,14 +92,14 @@ def add_element(arg, link_to, label=""): def generate_task_hash_for_plotting( - task_dict: dict, future_hash_inverse_dict: dict + task_dict: dict, future_hash_dict: dict ) -> bytes: """ Generate a hash for a task dictionary. Args: task_dict (dict): Dictionary containing task information. - future_hash_inverse_dict (dict): Dictionary mapping future hash to future object. + future_hash_dict (dict): Dictionary mapping future hash to future object. Returns: bytes: Hash generated for the task dictionary. @@ -117,7 +117,32 @@ def convert_arg(arg, future_hash_inverse_dict): The hash representation of the argument. """ if isinstance(arg, FutureSelector): - return future_hash_inverse_dict[arg._future] + if arg not in future_hash_inverse_dict: + if isinstance(arg._selector, str): + hash = cloudpickle.dumps( + { + "fn": "get_item_from_future", + "args": (), + "kwargs": { + "future": future_hash_inverse_dict[arg._future], + "selector": arg._selector, + }, + } + ) + else: + hash = cloudpickle.dumps( + { + "fn": "split_future", + "args": (), + "kwargs": { + "future": future_hash_inverse_dict[arg._future], + "selector": arg._selector, + }, + } + ) + future_hash_dict[hash] = arg + future_hash_inverse_dict[arg] = hash + return future_hash_inverse_dict[arg] elif isinstance(arg, Future): return future_hash_inverse_dict[arg] elif isinstance(arg, list): @@ -133,6 +158,7 @@ def convert_arg(arg, future_hash_inverse_dict): else: return arg + future_hash_inverse_dict = {v:k for k, v in future_hash_dict.items()} args_for_hash = [ convert_arg(arg=arg, future_hash_inverse_dict=future_hash_inverse_dict) for arg in task_dict["args"] diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index cfb9e9f3..6712a03c 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -5,6 +5,7 @@ from typing import Any, Callable, Optional from executorlib.standalone.batched import batched_futures +from executorlib.standalone.select import FutureSelector from executorlib.standalone.interactive.arguments import ( check_exception_was_raised, get_exception_lst, @@ -146,9 +147,7 @@ def submit( # type: ignore } task_hash = generate_task_hash_for_plotting( task_dict=task_dict, - future_hash_inverse_dict={ - v: k for k, v in self._future_hash_dict.items() - }, + future_hash_dict=self._future_hash_dict, ) self._future_hash_dict[task_hash] = f self._task_hash_dict[task_hash] = task_dict From 4911356eb53d22b575280d5a832667d7e199b9a2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 18:41:57 +0000 Subject: [PATCH 23/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/plot.py | 6 ++---- executorlib/task_scheduler/interactive/dependency.py | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index c76ea1f6..960960f3 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -91,9 +91,7 @@ def add_element(arg, link_to, label=""): return node_lst, edge_lst -def generate_task_hash_for_plotting( - task_dict: dict, future_hash_dict: dict -) -> bytes: +def generate_task_hash_for_plotting(task_dict: dict, future_hash_dict: dict) -> bytes: """ Generate a hash for a task dictionary. @@ -158,7 +156,7 @@ def convert_arg(arg, future_hash_inverse_dict): else: return arg - future_hash_inverse_dict = {v:k for k, v in future_hash_dict.items()} + future_hash_inverse_dict = {v: k for k, v in future_hash_dict.items()} args_for_hash = [ convert_arg(arg=arg, future_hash_inverse_dict=future_hash_inverse_dict) for arg in task_dict["args"] diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index 6712a03c..3444b484 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -5,7 +5,6 @@ from typing import Any, Callable, Optional from executorlib.standalone.batched import batched_futures -from executorlib.standalone.select import FutureSelector from executorlib.standalone.interactive.arguments import ( check_exception_was_raised, get_exception_lst, From 11c341702b5ac724454e486e97d4f3b870023aed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 21:29:43 +0200 Subject: [PATCH 24/28] Add tests --- ...test_singlenodeexecutor_plot_dependency.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_singlenodeexecutor_plot_dependency.py b/tests/test_singlenodeexecutor_plot_dependency.py index a3426ed4..ab645a42 100644 --- a/tests/test_singlenodeexecutor_plot_dependency.py +++ b/tests/test_singlenodeexecutor_plot_dependency.py @@ -6,6 +6,8 @@ SingleNodeExecutor, SlurmJobExecutor, SlurmClusterExecutor, + split_future, + get_item_from_future, ) from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting from executorlib.standalone.serialize import cloudpickle_register @@ -43,6 +45,17 @@ def return_input_dict(input_dict): return input_dict +def get_tuple(i): + return 1, 2, i + + +def get_dict(i): + return {"a": 1, "b": i} + + +def echo(i): + return i + @unittest.skipIf( skip_graphviz_test, "graphviz is not installed, so the plot_dependency_graph tests are skipped.", @@ -289,3 +302,24 @@ def test_many_to_one_plot(self): ) self.assertEqual(len(nodes), 19) self.assertEqual(len(edges), 22) + + +class TestSelectExecutorPlot(unittest.TestCase): + def test_split_future(self): + with SingleNodeExecutor(plot_dependency_graph=True) as exe: + f = exe.submit(get_tuple, 5) + f1, f2, f3 = split_future(future=f, n=3) + f12 = exe.submit(echo, f1) + f22 = exe.submit(echo, f2) + f32 = exe.submit(echo, f3) + self.assertIsNone(f12.result()) + self.assertIsNone(f22.result()) + self.assertIsNone(f32.result()) + + def test_get_item_from_future(self): + with SingleNodeExecutor(plot_dependency_graph=True) as exe: + f = exe.submit(get_dict, 5) + f1 = exe.submit(echo, get_item_from_future(future=f, key="a")) + f2 = exe.submit(echo, get_item_from_future(future=f, key="b")) + self.assertIsNone(f1.result()) + self.assertIsNone(f2.result()) From 54d46c8ad7f3d3f8ce83ba069c7f89d2fa42fe8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 21:32:05 +0200 Subject: [PATCH 25/28] skip test --- tests/test_singlenodeexecutor_plot_dependency.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_singlenodeexecutor_plot_dependency.py b/tests/test_singlenodeexecutor_plot_dependency.py index ab645a42..abae9a28 100644 --- a/tests/test_singlenodeexecutor_plot_dependency.py +++ b/tests/test_singlenodeexecutor_plot_dependency.py @@ -304,6 +304,10 @@ def test_many_to_one_plot(self): self.assertEqual(len(edges), 22) +@unittest.skipIf( + skip_graphviz_test, + "graphviz is not installed, so the plot_dependency_graph tests are skipped.", +) class TestSelectExecutorPlot(unittest.TestCase): def test_split_future(self): with SingleNodeExecutor(plot_dependency_graph=True) as exe: From 5788382e28116308409cbb234802d340ba739157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 21:46:36 +0200 Subject: [PATCH 26/28] remove redundant lines --- executorlib/standalone/plot.py | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/executorlib/standalone/plot.py b/executorlib/standalone/plot.py index a7af6280..6db3e701 100644 --- a/executorlib/standalone/plot.py +++ b/executorlib/standalone/plot.py @@ -116,30 +116,20 @@ def convert_arg(arg, future_hash_inverse_dict): """ if isinstance(arg, FutureSelector): if arg not in future_hash_inverse_dict: + obj_dict = { + "args": (), + "kwargs": { + "future": future_hash_inverse_dict[arg._future], + "selector": arg._selector, + }, + } if isinstance(arg._selector, str): - hash = cloudpickle.dumps( - { - "fn": "get_item_from_future", - "args": (), - "kwargs": { - "future": future_hash_inverse_dict[arg._future], - "selector": arg._selector, - }, - } - ) + obj_dict["fn"] = "get_item_from_future" else: - hash = cloudpickle.dumps( - { - "fn": "split_future", - "args": (), - "kwargs": { - "future": future_hash_inverse_dict[arg._future], - "selector": arg._selector, - }, - } - ) - future_hash_dict[hash] = arg - future_hash_inverse_dict[arg] = hash + obj_dict["fn"] = "split_future" + arg_hash = cloudpickle.dumps(obj_dict) + future_hash_dict[arg_hash] = arg + future_hash_inverse_dict[arg] = arg_hash return future_hash_inverse_dict[arg] elif isinstance(arg, Future): return future_hash_inverse_dict[arg] From 497bc16174708c3f6ae249aaa9e1d920868d678a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 10 Sep 2025 21:51:13 +0200 Subject: [PATCH 27/28] move plot module --- executorlib/task_scheduler/interactive/dependency.py | 2 +- .../plot.py => task_scheduler/interactive/dependency_plot.py} | 0 tests/test_fluxjobexecutor_plot.py | 2 +- tests/test_singlenodeexecutor_plot_dependency.py | 2 +- tests/test_testclusterexecutor.py | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) rename executorlib/{standalone/plot.py => task_scheduler/interactive/dependency_plot.py} (100%) diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index 3444b484..20972bee 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -11,7 +11,7 @@ get_future_objects_from_input, update_futures_in_input, ) -from executorlib.standalone.plot import ( +from executorlib.task_scheduler.interactive.dependency_plot import ( generate_nodes_and_edges_for_plotting, generate_task_hash_for_plotting, plot_dependency_graph_function, diff --git a/executorlib/standalone/plot.py b/executorlib/task_scheduler/interactive/dependency_plot.py similarity index 100% rename from executorlib/standalone/plot.py rename to executorlib/task_scheduler/interactive/dependency_plot.py diff --git a/tests/test_fluxjobexecutor_plot.py b/tests/test_fluxjobexecutor_plot.py index 43404f13..2f6d64dd 100644 --- a/tests/test_fluxjobexecutor_plot.py +++ b/tests/test_fluxjobexecutor_plot.py @@ -3,7 +3,7 @@ from time import sleep from executorlib import FluxJobExecutor, FluxClusterExecutor -from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting +from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting from executorlib.standalone.serialize import cloudpickle_register diff --git a/tests/test_singlenodeexecutor_plot_dependency.py b/tests/test_singlenodeexecutor_plot_dependency.py index abae9a28..60c974e1 100644 --- a/tests/test_singlenodeexecutor_plot_dependency.py +++ b/tests/test_singlenodeexecutor_plot_dependency.py @@ -9,7 +9,7 @@ split_future, get_item_from_future, ) -from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting +from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting from executorlib.standalone.serialize import cloudpickle_register diff --git a/tests/test_testclusterexecutor.py b/tests/test_testclusterexecutor.py index 73e1d361..d0615eb3 100644 --- a/tests/test_testclusterexecutor.py +++ b/tests/test_testclusterexecutor.py @@ -4,7 +4,7 @@ from executorlib import get_cache_data from executorlib.api import TestClusterExecutor -from executorlib.standalone.plot import generate_nodes_and_edges_for_plotting +from executorlib.task_scheduler.interactive.dependency_plot import generate_nodes_and_edges_for_plotting from executorlib.standalone.serialize import cloudpickle_register try: From 4a41b75d20b2012a677bb6555b81275b7e7d4991 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 19:51:25 +0000 Subject: [PATCH 28/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/dependency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/dependency.py b/executorlib/task_scheduler/interactive/dependency.py index 20972bee..a3a43cb8 100644 --- a/executorlib/task_scheduler/interactive/dependency.py +++ b/executorlib/task_scheduler/interactive/dependency.py @@ -11,12 +11,12 @@ get_future_objects_from_input, update_futures_in_input, ) +from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.dependency_plot import ( generate_nodes_and_edges_for_plotting, generate_task_hash_for_plotting, plot_dependency_graph_function, ) -from executorlib.task_scheduler.base import TaskSchedulerBase class DependencyTaskScheduler(TaskSchedulerBase):