From 4cac423a0c0c1165983b1393d168f5f2bdfd528d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 30 Aug 2024 09:38:39 +0200 Subject: [PATCH] Rename Interface to Spawner --- executorlib/interactive/__init__.py | 20 +++---- executorlib/interactive/executor.py | 10 ++-- executorlib/interactive/flux.py | 4 +- executorlib/shared/__init__.py | 6 +-- executorlib/shared/communication.py | 6 +-- executorlib/shared/executor.py | 14 ++--- .../shared/{interface.py => spawner.py} | 8 +-- tests/test_executor_backend_flux.py | 2 +- tests/test_flux_executor.py | 22 ++++---- tests/test_local_executor.py | 54 +++++++++---------- tests/test_local_executor_future.py | 10 ++-- tests/test_shared_backend.py | 8 +-- tests/test_shared_communication.py | 6 +-- 13 files changed, 85 insertions(+), 85 deletions(-) rename executorlib/shared/{interface.py => spawner.py} (98%) diff --git a/executorlib/interactive/__init__.py b/executorlib/interactive/__init__.py index 77f8d785..fca5ce6d 100644 --- a/executorlib/interactive/__init__.py +++ b/executorlib/interactive/__init__.py @@ -18,14 +18,14 @@ validate_backend, validate_number_of_cores, ) -from executorlib.shared.interface import ( +from executorlib.shared.spawner import ( SLURM_COMMAND, - MpiExecInterface, - SrunInterface, + MpiExecSpawner, + SrunSpawner, ) try: # The PyFluxExecutor requires flux-core to be installed. - from executorlib.interactive.flux import FluxPythonInterface + from executorlib.interactive.flux import FluxPythonSpawner flux_installed = "FLUX_URI" in os.environ except ImportError: @@ -122,13 +122,13 @@ def create_executor( return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) elif backend == "slurm": check_executor(executor=executor) @@ -142,13 +142,13 @@ def create_executor( return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, - interface_class=SrunInterface, + interface_class=SrunSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, - interface_class=SrunInterface, + interface_class=SrunSpawner, ) else: # backend="local" check_threads_per_core(threads_per_core=threads_per_core) @@ -164,11 +164,11 @@ def create_executor( return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 8e7ee1b9..2c20d29e 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -4,7 +4,7 @@ execute_parallel_tasks, execute_separate_tasks, ) -from executorlib.shared.interface import BaseInterface, MpiExecInterface +from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner from executorlib.shared.thread import RaisingThread @@ -19,7 +19,7 @@ class InteractiveExecutor(ExecutorBroker): Args: max_workers (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor - interface_class (BaseInterface): interface class to initiate python processes + interface_class (BaseSpawner): interface class to initiate python processes Examples: @@ -46,7 +46,7 @@ def __init__( self, max_workers: int = 1, executor_kwargs: dict = {}, - interface_class: BaseInterface = MpiExecInterface, + interface_class: BaseSpawner = MpiExecSpawner, ): super().__init__() executor_kwargs["future_queue"] = self._future_queue @@ -73,7 +73,7 @@ class InteractiveStepExecutor(ExecutorSteps): Args: max_cores (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor - interface_class (BaseInterface): interface class to initiate python processes + interface_class (BaseSpawner): interface class to initiate python processes Examples: @@ -98,7 +98,7 @@ def __init__( self, max_cores: int = 1, executor_kwargs: dict = {}, - interface_class: BaseInterface = MpiExecInterface, + interface_class: BaseSpawner = MpiExecSpawner, ): super().__init__() executor_kwargs["future_queue"] = self._future_queue diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index c3f5db4d..1b29a8c7 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -3,10 +3,10 @@ import flux.job -from executorlib.shared.interface import BaseInterface +from executorlib.shared.spawner import BaseSpawner -class FluxPythonInterface(BaseInterface): +class FluxPythonSpawner(BaseSpawner): """ A class representing the FluxPythonInterface. diff --git a/executorlib/shared/__init__.py b/executorlib/shared/__init__.py index bc73d7b5..c185a355 100644 --- a/executorlib/shared/__init__.py +++ b/executorlib/shared/__init__.py @@ -7,7 +7,7 @@ interface_shutdown, ) from executorlib.shared.executor import cancel_items_in_queue -from executorlib.shared.interface import MpiExecInterface, SrunInterface +from executorlib.shared.spawner import MpiExecSpawner, SrunSpawner from executorlib.shared.thread import RaisingThread __all__ = [ @@ -19,6 +19,6 @@ interface_receive, cancel_items_in_queue, RaisingThread, - MpiExecInterface, - SrunInterface, + MpiExecSpawner, + SrunSpawner, ] diff --git a/executorlib/shared/communication.py b/executorlib/shared/communication.py index f302e09b..a87b34a1 100644 --- a/executorlib/shared/communication.py +++ b/executorlib/shared/communication.py @@ -10,7 +10,7 @@ class SocketInterface: The SocketInterface is an abstraction layer on top of the zero message queue. Args: - interface (executorlib.shared.interface.BaseInterface): Interface for starting the parallel process + interface (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process """ def __init__(self, interface=None): @@ -18,7 +18,7 @@ def __init__(self, interface=None): Initialize the SocketInterface. Args: - interface (executorlib.shared.interface.BaseInterface): Interface for starting the parallel process + interface (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process """ self._context = zmq.Context() self._socket = self._context.socket(zmq.PAIR) @@ -133,7 +133,7 @@ def interface_bootup( Args: command_lst (list): List of commands as strings - connections (executorlib.shared.interface.BaseInterface): Interface to start parallel process, like MPI, SLURM + connections (executorlib.shared.spawner.BaseSpawner): Interface to start parallel process, like MPI, SLURM or Flux hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an diff --git a/executorlib/shared/executor.py b/executorlib/shared/executor.py index c805ad92..8be7ed6c 100644 --- a/executorlib/shared/executor.py +++ b/executorlib/shared/executor.py @@ -19,7 +19,7 @@ check_resource_dict, check_resource_dict_is_empty, ) -from executorlib.shared.interface import BaseInterface, MpiExecInterface +from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner from executorlib.shared.thread import RaisingThread @@ -301,7 +301,7 @@ def cloudpickle_register(ind: int = 2): def execute_parallel_tasks( future_queue: queue.Queue, cores: int = 1, - interface_class: BaseInterface = MpiExecInterface, + interface_class: BaseSpawner = MpiExecSpawner, hostname_localhost: bool = False, init_function: Optional[Callable] = None, prefix_name: Optional[str] = None, @@ -314,7 +314,7 @@ def execute_parallel_tasks( Args: future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process cores (int): defines the total number of MPI ranks to use - interface_class (BaseInterface): Interface to start process on selected compute resources + interface_class (BaseSpawner): Interface to start process on selected compute resources hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -362,7 +362,7 @@ def execute_parallel_tasks( def execute_separate_tasks( future_queue: queue.Queue, - interface_class: BaseInterface = MpiExecInterface, + interface_class: BaseSpawner = MpiExecSpawner, max_cores: int = 1, hostname_localhost: bool = False, **kwargs, @@ -372,7 +372,7 @@ def execute_separate_tasks( Args: future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - interface_class (BaseInterface): Interface to start process on selected compute resources + interface_class (BaseSpawner): Interface to start process on selected compute resources max_cores (int): defines the number cores which can be used in parallel hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -621,7 +621,7 @@ def _submit_function_to_separate_process( task_dict: dict, active_task_dict: dict, qtask: queue.Queue, - interface_class: BaseInterface, + interface_class: BaseSpawner, executor_kwargs: dict, max_cores: int = 1, hostname_localhost: bool = False, @@ -633,7 +633,7 @@ def _submit_function_to_separate_process( {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} active_task_dict (dict): Dictionary containing the future objects and the number of cores they require qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function - interface_class (BaseInterface): Interface to start process on selected compute resources + interface_class (BaseSpawner): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the diff --git a/executorlib/shared/interface.py b/executorlib/shared/spawner.py similarity index 98% rename from executorlib/shared/interface.py rename to executorlib/shared/spawner.py index 063bbd5e..10f2fdae 100644 --- a/executorlib/shared/interface.py +++ b/executorlib/shared/spawner.py @@ -6,7 +6,7 @@ SLURM_COMMAND = "srun" -class BaseInterface(ABC): +class BaseSpawner(ABC): def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False): """ Base class for interface implementations. @@ -55,7 +55,7 @@ def poll(self): raise NotImplementedError -class SubprocessInterface(BaseInterface): +class SubprocessSpawner(BaseSpawner): def __init__( self, cwd: Optional[str] = None, @@ -143,7 +143,7 @@ def poll(self) -> bool: return self._process is not None and self._process.poll() is None -class MpiExecInterface(SubprocessInterface): +class MpiExecSpawner(SubprocessSpawner): def generate_command(self, command_lst: list[str]) -> list[str]: """ Generate the command list for the MPIExec interface. @@ -163,7 +163,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]: ) -class SrunInterface(SubprocessInterface): +class SrunSpawner(SubprocessSpawner): def __init__( self, cwd: Optional[str] = None, diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index e31f8414..ac6a50ed 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -8,7 +8,7 @@ try: import flux.job - from executorlib.interactive.flux import FluxPythonInterface + from executorlib.interactive.flux import FluxPythonSpawner skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index e3d95c2a..1239fb30 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -11,7 +11,7 @@ try: import flux.job - from executorlib.interactive.flux import FluxPythonInterface + from executorlib.interactive.flux import FluxPythonSpawner skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -50,7 +50,7 @@ def test_flux_executor_serial(self): with InteractiveExecutor( max_workers=2, executor_kwargs={"executor": self.executor}, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -63,7 +63,7 @@ def test_flux_executor_threads(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "threads_per_core": 2}, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -76,7 +76,7 @@ def test_flux_executor_parallel(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -86,7 +86,7 @@ def test_single_task(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( @@ -104,7 +104,7 @@ def test_execute_task(self): future_queue=q, cores=1, executor=self.executor, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) self.assertEqual(f.result(), 2) q.join() @@ -120,7 +120,7 @@ def test_execute_task_threads(self): cores=1, threads_per_core=1, executor=self.executor, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) self.assertEqual(f.result(), 2) q.join() @@ -133,7 +133,7 @@ def test_internal_memory(self): "cores": 1, "init_function": set_global, }, - interface_class=FluxPythonInterface, + interface_class=FluxPythonSpawner, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) @@ -142,13 +142,13 @@ def test_internal_memory(self): def test_interface_exception(self): with self.assertRaises(ValueError): - flux_interface = FluxPythonInterface( + flux_interface = FluxPythonSpawner( executor=self.executor, oversubscribe=True ) flux_interface.bootup(command_lst=[]) with self.assertRaises(ValueError): - flux_interface = FluxPythonInterface(executor=self.executor) + flux_interface = FluxPythonSpawner(executor=self.executor) flux_interface.bootup(command_lst=[], prefix_path="/path/to/conda/env") with self.assertRaises(ValueError): - flux_interface = FluxPythonInterface(executor=self.executor) + flux_interface = FluxPythonSpawner(executor=self.executor) flux_interface.bootup(command_lst=[], prefix_name="conda_env_name") diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 13dacd2b..c62c170e 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -6,7 +6,7 @@ import numpy as np -from executorlib.shared.interface import MpiExecInterface +from executorlib.shared.spawner import MpiExecSpawner from executorlib.interactive.executor import ( InteractiveExecutor, InteractiveStepExecutor, @@ -64,7 +64,7 @@ def test_pympiexecutor_two_workers(self): with InteractiveExecutor( max_workers=2, executor_kwargs={"hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -78,7 +78,7 @@ def test_pympiexecutor_one_worker(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -94,7 +94,7 @@ def test_pympiexecutor_two_workers(self): with InteractiveStepExecutor( max_cores=2, executor_kwargs={"hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -108,7 +108,7 @@ def test_pympiexecutor_one_worker(self): with InteractiveStepExecutor( max_cores=1, executor_kwargs={"hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -127,7 +127,7 @@ def test_pympiexecutor_one_worker_with_mpi(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -138,7 +138,7 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -158,7 +158,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() @@ -173,7 +173,7 @@ def test_pympiexecutor_one_worker_with_mpi(self): with InteractiveStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -184,7 +184,7 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): with InteractiveStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -204,7 +204,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): with InteractiveStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() @@ -220,7 +220,7 @@ def test_internal_memory(self): "init_function": set_global, "hostname_localhost": True, }, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) @@ -246,7 +246,7 @@ def test_execute_task(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, hostname_localhost=True, init_function=set_global, ) @@ -259,7 +259,7 @@ def test_pool_serial(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: output = p.submit(calc_array, i=2) self.assertEqual(len(p), 1) @@ -274,7 +274,7 @@ def test_executor_multi_submission(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: fs_1 = p.submit(calc_array, i=2) fs_2 = p.submit(calc_array, i=2) @@ -287,7 +287,7 @@ def test_shutdown(self): p = InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) fs1 = p.submit(sleep_one, i=2) fs2 = p.submit(sleep_one, i=4) @@ -303,7 +303,7 @@ def test_pool_serial_map(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: output = p.map(calc_array, [1, 2, 3]) self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) @@ -313,7 +313,7 @@ def test_executor_exception(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: p.submit(raise_error) @@ -322,7 +322,7 @@ def test_executor_exception_future(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: fs = p.submit(raise_error) fs.result() @@ -349,7 +349,7 @@ def test_meta(self): "cwd": None, "oversubscribe": False, }, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: for k, v in meta_data_exe_dict.items(): if k != "interface_class": @@ -376,7 +376,7 @@ def test_meta_step(self): "cwd": None, "oversubscribe": False, }, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as exe: for k, v in meta_data_exe_dict.items(): if k != "interface_class": @@ -391,7 +391,7 @@ def test_pool_multi_core(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: output = p.submit(mpi_funct, i=2) self.assertEqual(len(p), 1) @@ -409,7 +409,7 @@ def test_pool_multi_core_map(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( @@ -427,7 +427,7 @@ def test_execute_task_failed_no_argument(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, hostname_localhost=True, ) q.join() @@ -442,7 +442,7 @@ def test_execute_task_failed_wrong_argument(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, hostname_localhost=True, ) q.join() @@ -457,7 +457,7 @@ def test_execute_task(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, hostname_localhost=True, ) self.assertEqual(f.result(), np.array(4)) @@ -476,7 +476,7 @@ def test_execute_task_parallel(self): future_queue=q, cores=2, oversubscribe=False, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, hostname_localhost=True, ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) diff --git a/tests/test_local_executor_future.py b/tests/test_local_executor_future.py index a0e8b895..b2d0b453 100644 --- a/tests/test_local_executor_future.py +++ b/tests/test_local_executor_future.py @@ -6,7 +6,7 @@ import numpy as np from executorlib.interactive.executor import InteractiveExecutor -from executorlib.shared.interface import MpiExecInterface +from executorlib.shared.spawner import MpiExecSpawner skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -21,7 +21,7 @@ def test_pool_serial(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True, "cores": 1}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) @@ -37,7 +37,7 @@ def test_pool_serial_multi_core(self): with InteractiveExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True, "cores": 2}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) @@ -69,7 +69,7 @@ def submit(): # this function is exits future = InteractiveExecutor( executor_kwargs={"hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ).submit(slow_callable) future.add_done_callback(callback) return future @@ -109,7 +109,7 @@ def run(self): future = InteractiveExecutor( executor_kwargs={"hostname_localhost": True}, - interface_class=MpiExecInterface, + interface_class=MpiExecSpawner, ).submit(self.return_42) future.add_done_callback(self.finished) diff --git a/tests/test_shared_backend.py b/tests/test_shared_backend.py index 5200aede..c93cfb4f 100644 --- a/tests/test_shared_backend.py +++ b/tests/test_shared_backend.py @@ -3,7 +3,7 @@ import unittest from executorlib.interactive.backend import parse_arguments -from executorlib.shared.interface import SrunInterface, MpiExecInterface +from executorlib.shared.spawner import SrunSpawner, MpiExecSpawner class TestParser(unittest.TestCase): @@ -22,7 +22,7 @@ def test_command_local(self): "--zmqport", result_dict["zmqport"], ] - interface = MpiExecInterface(cwd=None, cores=2, oversubscribe=True) + interface = MpiExecSpawner(cwd=None, cores=2, oversubscribe=True) self.assertEqual( command_lst, interface.generate_command( @@ -51,7 +51,7 @@ def test_command_slurm(self): "--zmqport", result_dict["zmqport"], ] - interface = SrunInterface( + interface = SrunSpawner( cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True ) self.assertEqual( @@ -91,7 +91,7 @@ def test_command_slurm_user_command(self): "--zmqport", result_dict["zmqport"], ] - interface = SrunInterface( + interface = SrunSpawner( cwd=os.path.abspath("."), cores=2, gpus_per_core=1, diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 3fbd9e69..a1236bed 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -14,7 +14,7 @@ SocketInterface, ) from executorlib.shared.executor import cloudpickle_register -from executorlib.shared.interface import MpiExecInterface +from executorlib.shared.spawner import MpiExecSpawner skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -32,7 +32,7 @@ def test_interface_mpi(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( - interface=MpiExecInterface(cwd=None, cores=1, oversubscribe=False) + interface=MpiExecSpawner(cwd=None, cores=1, oversubscribe=False) ) interface.bootup( command_lst=[ @@ -60,7 +60,7 @@ def test_interface_serial(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( - interface=MpiExecInterface(cwd=None, cores=1, oversubscribe=False) + interface=MpiExecSpawner(cwd=None, cores=1, oversubscribe=False) ) interface.bootup( command_lst=[