Skip to content

Commit

Permalink
Rename Interface to Spawner
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Aug 30, 2024
1 parent 939b5b8 commit 4cac423
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 85 deletions.
20 changes: 10 additions & 10 deletions executorlib/interactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
validate_backend,
validate_number_of_cores,
)
from executorlib.shared.interface import (
from executorlib.shared.spawner import (
SLURM_COMMAND,
MpiExecInterface,
SrunInterface,
MpiExecSpawner,
SrunSpawner,
)

try: # The PyFluxExecutor requires flux-core to be installed.
from executorlib.interactive.flux import FluxPythonInterface
from executorlib.interactive.flux import FluxPythonSpawner

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand Down Expand Up @@ -122,13 +122,13 @@ def create_executor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
elif backend == "slurm":
check_executor(executor=executor)
Expand All @@ -142,13 +142,13 @@ def create_executor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
interface_class=SrunSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
interface_class=SrunSpawner,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
Expand All @@ -164,11 +164,11 @@ def create_executor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
interface_class=MpiExecSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
interface_class=MpiExecSpawner,
)
10 changes: 5 additions & 5 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
execute_parallel_tasks,
execute_separate_tasks,
)
from executorlib.shared.interface import BaseInterface, MpiExecInterface
from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner
from executorlib.shared.thread import RaisingThread


Expand All @@ -19,7 +19,7 @@ class InteractiveExecutor(ExecutorBroker):
Args:
max_workers (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
interface_class (BaseInterface): interface class to initiate python processes
interface_class (BaseSpawner): interface class to initiate python processes
Examples:
Expand All @@ -46,7 +46,7 @@ def __init__(
self,
max_workers: int = 1,
executor_kwargs: dict = {},
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
Expand All @@ -73,7 +73,7 @@ class InteractiveStepExecutor(ExecutorSteps):
Args:
max_cores (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
interface_class (BaseInterface): interface class to initiate python processes
interface_class (BaseSpawner): interface class to initiate python processes
Examples:
Expand All @@ -98,7 +98,7 @@ def __init__(
self,
max_cores: int = 1,
executor_kwargs: dict = {},
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import flux.job

from executorlib.shared.interface import BaseInterface
from executorlib.shared.spawner import BaseSpawner


class FluxPythonInterface(BaseInterface):
class FluxPythonSpawner(BaseSpawner):
"""
A class representing the FluxPythonInterface.
Expand Down
6 changes: 3 additions & 3 deletions executorlib/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
interface_shutdown,
)
from executorlib.shared.executor import cancel_items_in_queue
from executorlib.shared.interface import MpiExecInterface, SrunInterface
from executorlib.shared.spawner import MpiExecSpawner, SrunSpawner
from executorlib.shared.thread import RaisingThread

__all__ = [
Expand All @@ -19,6 +19,6 @@
interface_receive,
cancel_items_in_queue,
RaisingThread,
MpiExecInterface,
SrunInterface,
MpiExecSpawner,
SrunSpawner,
]
6 changes: 3 additions & 3 deletions executorlib/shared/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ class SocketInterface:
The SocketInterface is an abstraction layer on top of the zero message queue.
Args:
interface (executorlib.shared.interface.BaseInterface): Interface for starting the parallel process
interface (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
"""

def __init__(self, interface=None):
"""
Initialize the SocketInterface.
Args:
interface (executorlib.shared.interface.BaseInterface): Interface for starting the parallel process
interface (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
"""
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PAIR)
Expand Down Expand Up @@ -133,7 +133,7 @@ def interface_bootup(
Args:
command_lst (list): List of commands as strings
connections (executorlib.shared.interface.BaseInterface): Interface to start parallel process, like MPI, SLURM
connections (executorlib.shared.spawner.BaseSpawner): Interface to start parallel process, like MPI, SLURM
or Flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down
14 changes: 7 additions & 7 deletions executorlib/shared/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.shared.interface import BaseInterface, MpiExecInterface
from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner
from executorlib.shared.thread import RaisingThread


Expand Down Expand Up @@ -301,7 +301,7 @@ def cloudpickle_register(ind: int = 2):
def execute_parallel_tasks(
future_queue: queue.Queue,
cores: int = 1,
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
hostname_localhost: bool = False,
init_function: Optional[Callable] = None,
prefix_name: Optional[str] = None,
Expand All @@ -314,7 +314,7 @@ def execute_parallel_tasks(
Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
interface_class (BaseInterface): Interface to start process on selected compute resources
interface_class (BaseSpawner): Interface to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -362,7 +362,7 @@ def execute_parallel_tasks(

def execute_separate_tasks(
future_queue: queue.Queue,
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
max_cores: int = 1,
hostname_localhost: bool = False,
**kwargs,
Expand All @@ -372,7 +372,7 @@ def execute_separate_tasks(
Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
interface_class (BaseInterface): Interface to start process on selected compute resources
interface_class (BaseSpawner): Interface to start process on selected compute resources
max_cores (int): defines the number cores which can be used in parallel
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -621,7 +621,7 @@ def _submit_function_to_separate_process(
task_dict: dict,
active_task_dict: dict,
qtask: queue.Queue,
interface_class: BaseInterface,
interface_class: BaseSpawner,
executor_kwargs: dict,
max_cores: int = 1,
hostname_localhost: bool = False,
Expand All @@ -633,7 +633,7 @@ def _submit_function_to_separate_process(
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
interface_class (BaseInterface): Interface to start process on selected compute resources
interface_class (BaseSpawner): Interface to start process on selected compute resources
executor_kwargs (dict): keyword parameters used to initialize the Executor
max_cores (int): defines the number cores which can be used in parallel
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
SLURM_COMMAND = "srun"


class BaseInterface(ABC):
class BaseSpawner(ABC):
def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False):
"""
Base class for interface implementations.
Expand Down Expand Up @@ -55,7 +55,7 @@ def poll(self):
raise NotImplementedError


class SubprocessInterface(BaseInterface):
class SubprocessSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
Expand Down Expand Up @@ -143,7 +143,7 @@ def poll(self) -> bool:
return self._process is not None and self._process.poll() is None


class MpiExecInterface(SubprocessInterface):
class MpiExecSpawner(SubprocessSpawner):
def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Generate the command list for the MPIExec interface.
Expand All @@ -163,7 +163,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
)


class SrunInterface(SubprocessInterface):
class SrunSpawner(SubprocessSpawner):
def __init__(
self,
cwd: Optional[str] = None,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

try:
import flux.job
from executorlib.interactive.flux import FluxPythonInterface
from executorlib.interactive.flux import FluxPythonSpawner

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand Down
22 changes: 11 additions & 11 deletions tests/test_flux_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

try:
import flux.job
from executorlib.interactive.flux import FluxPythonInterface
from executorlib.interactive.flux import FluxPythonSpawner

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand Down Expand Up @@ -50,7 +50,7 @@ def test_flux_executor_serial(self):
with InteractiveExecutor(
max_workers=2,
executor_kwargs={"executor": self.executor},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
Expand All @@ -63,7 +63,7 @@ def test_flux_executor_threads(self):
with InteractiveExecutor(
max_workers=1,
executor_kwargs={"executor": self.executor, "threads_per_core": 2},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
Expand All @@ -76,7 +76,7 @@ def test_flux_executor_parallel(self):
with InteractiveExecutor(
max_workers=1,
executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(mpi_funct, 1)
self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)])
Expand All @@ -86,7 +86,7 @@ def test_single_task(self):
with InteractiveExecutor(
max_workers=1,
executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as p:
output = p.map(mpi_funct, [1, 2, 3])
self.assertEqual(
Expand All @@ -104,7 +104,7 @@ def test_execute_task(self):
future_queue=q,
cores=1,
executor=self.executor,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
self.assertEqual(f.result(), 2)
q.join()
Expand All @@ -120,7 +120,7 @@ def test_execute_task_threads(self):
cores=1,
threads_per_core=1,
executor=self.executor,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
self.assertEqual(f.result(), 2)
q.join()
Expand All @@ -133,7 +133,7 @@ def test_internal_memory(self):
"cores": 1,
"init_function": set_global,
},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as p:
f = p.submit(get_global)
self.assertFalse(f.done())
Expand All @@ -142,13 +142,13 @@ def test_internal_memory(self):

def test_interface_exception(self):
with self.assertRaises(ValueError):
flux_interface = FluxPythonInterface(
flux_interface = FluxPythonSpawner(
executor=self.executor, oversubscribe=True
)
flux_interface.bootup(command_lst=[])
with self.assertRaises(ValueError):
flux_interface = FluxPythonInterface(executor=self.executor)
flux_interface = FluxPythonSpawner(executor=self.executor)
flux_interface.bootup(command_lst=[], prefix_path="/path/to/conda/env")
with self.assertRaises(ValueError):
flux_interface = FluxPythonInterface(executor=self.executor)
flux_interface = FluxPythonSpawner(executor=self.executor)
flux_interface.bootup(command_lst=[], prefix_name="conda_env_name")
Loading

0 comments on commit 4cac423

Please sign in to comment.