Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Interface to Spawner #398

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions executorlib/interactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
validate_backend,
validate_number_of_cores,
)
from executorlib.shared.interface import (
from executorlib.shared.spawner import (
SLURM_COMMAND,
MpiExecInterface,
SrunInterface,
MpiExecSpawner,
SrunSpawner,
)

try: # The PyFluxExecutor requires flux-core to be installed.
from executorlib.interactive.flux import FluxPythonInterface
from executorlib.interactive.flux import FluxPythonSpawner

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand Down Expand Up @@ -122,13 +122,13 @@ def create_executor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
elif backend == "slurm":
check_executor(executor=executor)
Expand All @@ -142,13 +142,13 @@ def create_executor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
interface_class=SrunSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=SrunInterface,
interface_class=SrunSpawner,
)
else: # backend="local"
check_threads_per_core(threads_per_core=threads_per_core)
Expand All @@ -164,11 +164,11 @@ def create_executor(
return InteractiveExecutor(
max_workers=int(max_cores / cores_per_worker),
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
interface_class=MpiExecSpawner,
)
else:
return InteractiveStepExecutor(
max_cores=max_cores,
executor_kwargs=executor_kwargs,
interface_class=MpiExecInterface,
interface_class=MpiExecSpawner,
)
10 changes: 5 additions & 5 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
execute_parallel_tasks,
execute_separate_tasks,
)
from executorlib.shared.interface import BaseInterface, MpiExecInterface
from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner
from executorlib.shared.thread import RaisingThread


Expand All @@ -19,7 +19,7 @@ class InteractiveExecutor(ExecutorBroker):
Args:
max_workers (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
interface_class (BaseInterface): interface class to initiate python processes
interface_class (BaseSpawner): interface class to initiate python processes

Examples:

Expand All @@ -46,7 +46,7 @@ def __init__(
self,
max_workers: int = 1,
executor_kwargs: dict = {},
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
Expand All @@ -73,7 +73,7 @@ class InteractiveStepExecutor(ExecutorSteps):
Args:
max_cores (int): defines the number workers which can execute functions in parallel
executor_kwargs (dict): keyword arguments for the executor
interface_class (BaseInterface): interface class to initiate python processes
interface_class (BaseSpawner): interface class to initiate python processes

Examples:

Expand All @@ -98,7 +98,7 @@ def __init__(
self,
max_cores: int = 1,
executor_kwargs: dict = {},
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import flux.job

from executorlib.shared.interface import BaseInterface
from executorlib.shared.spawner import BaseSpawner


class FluxPythonInterface(BaseInterface):
class FluxPythonSpawner(BaseSpawner):
"""
A class representing the FluxPythonInterface.

Expand Down
6 changes: 3 additions & 3 deletions executorlib/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
interface_shutdown,
)
from executorlib.shared.executor import cancel_items_in_queue
from executorlib.shared.interface import MpiExecInterface, SrunInterface
from executorlib.shared.spawner import MpiExecSpawner, SrunSpawner
from executorlib.shared.thread import RaisingThread

__all__ = [
Expand All @@ -19,6 +19,6 @@
interface_receive,
cancel_items_in_queue,
RaisingThread,
MpiExecInterface,
SrunInterface,
MpiExecSpawner,
SrunSpawner,
]
6 changes: 3 additions & 3 deletions executorlib/shared/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ class SocketInterface:
The SocketInterface is an abstraction layer on top of the zero message queue.

Args:
interface (executorlib.shared.interface.BaseInterface): Interface for starting the parallel process
interface (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
"""

def __init__(self, interface=None):
"""
Initialize the SocketInterface.

Args:
interface (executorlib.shared.interface.BaseInterface): Interface for starting the parallel process
interface (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
"""
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PAIR)
Expand Down Expand Up @@ -133,7 +133,7 @@ def interface_bootup(

Args:
command_lst (list): List of commands as strings
connections (executorlib.shared.interface.BaseInterface): Interface to start parallel process, like MPI, SLURM
connections (executorlib.shared.spawner.BaseSpawner): Interface to start parallel process, like MPI, SLURM
or Flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down
14 changes: 7 additions & 7 deletions executorlib/shared/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.shared.interface import BaseInterface, MpiExecInterface
from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner
from executorlib.shared.thread import RaisingThread


Expand Down Expand Up @@ -301,7 +301,7 @@ def cloudpickle_register(ind: int = 2):
def execute_parallel_tasks(
future_queue: queue.Queue,
cores: int = 1,
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
hostname_localhost: bool = False,
init_function: Optional[Callable] = None,
prefix_name: Optional[str] = None,
Expand All @@ -314,7 +314,7 @@ def execute_parallel_tasks(
Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
interface_class (BaseInterface): Interface to start process on selected compute resources
interface_class (BaseSpawner): Interface to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -362,7 +362,7 @@ def execute_parallel_tasks(

def execute_separate_tasks(
future_queue: queue.Queue,
interface_class: BaseInterface = MpiExecInterface,
interface_class: BaseSpawner = MpiExecSpawner,
max_cores: int = 1,
hostname_localhost: bool = False,
**kwargs,
Expand All @@ -372,7 +372,7 @@ def execute_separate_tasks(

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
interface_class (BaseInterface): Interface to start process on selected compute resources
interface_class (BaseSpawner): Interface to start process on selected compute resources
max_cores (int): defines the number cores which can be used in parallel
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand Down Expand Up @@ -621,7 +621,7 @@ def _submit_function_to_separate_process(
task_dict: dict,
active_task_dict: dict,
qtask: queue.Queue,
interface_class: BaseInterface,
interface_class: BaseSpawner,
executor_kwargs: dict,
max_cores: int = 1,
hostname_localhost: bool = False,
Expand All @@ -633,7 +633,7 @@ def _submit_function_to_separate_process(
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
interface_class (BaseInterface): Interface to start process on selected compute resources
interface_class (BaseSpawner): Interface to start process on selected compute resources
executor_kwargs (dict): keyword parameters used to initialize the Executor
max_cores (int): defines the number cores which can be used in parallel
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
SLURM_COMMAND = "srun"


class BaseInterface(ABC):
class BaseSpawner(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class BaseSpawner correctly renamed but lacks abstract methods.

The renaming from BaseInterface to BaseSpawner aligns with the refactoring goals. However, it's flagged that this abstract base class lacks abstract methods, which might be an oversight.

Consider adding abstract methods to enforce implementation in derived classes or reevaluate if this class needs to be abstract.

Tools
Ruff

9-9: BaseSpawner is an abstract base class, but it has no abstract methods

(B024)

def __init__(self, cwd: str, cores: int = 1, oversubscribe: bool = False):
"""
Base class for interface implementations.
Expand Down Expand Up @@ -55,7 +55,7 @@ def poll(self):
raise NotImplementedError


class SubprocessInterface(BaseInterface):
class SubprocessSpawner(BaseSpawner):
def __init__(
self,
cwd: Optional[str] = None,
Expand Down Expand Up @@ -143,7 +143,7 @@ def poll(self) -> bool:
return self._process is not None and self._process.poll() is None


class MpiExecInterface(SubprocessInterface):
class MpiExecSpawner(SubprocessSpawner):
def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Generate the command list for the MPIExec interface.
Expand All @@ -163,7 +163,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
)


class SrunInterface(SubprocessInterface):
class SrunSpawner(SubprocessSpawner):
def __init__(
self,
cwd: Optional[str] = None,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

try:
import flux.job
from executorlib.interactive.flux import FluxPythonInterface
from executorlib.interactive.flux import FluxPythonSpawner

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand Down
22 changes: 11 additions & 11 deletions tests/test_flux_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

try:
import flux.job
from executorlib.interactive.flux import FluxPythonInterface
from executorlib.interactive.flux import FluxPythonSpawner

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand Down Expand Up @@ -50,7 +50,7 @@ def test_flux_executor_serial(self):
with InteractiveExecutor(
max_workers=2,
executor_kwargs={"executor": self.executor},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
Expand All @@ -63,7 +63,7 @@ def test_flux_executor_threads(self):
with InteractiveExecutor(
max_workers=1,
executor_kwargs={"executor": self.executor, "threads_per_core": 2},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(calc, 1)
fs_2 = exe.submit(calc, 2)
Expand All @@ -76,7 +76,7 @@ def test_flux_executor_parallel(self):
with InteractiveExecutor(
max_workers=1,
executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as exe:
fs_1 = exe.submit(mpi_funct, 1)
self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)])
Expand All @@ -86,7 +86,7 @@ def test_single_task(self):
with InteractiveExecutor(
max_workers=1,
executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as p:
output = p.map(mpi_funct, [1, 2, 3])
self.assertEqual(
Expand All @@ -104,7 +104,7 @@ def test_execute_task(self):
future_queue=q,
cores=1,
executor=self.executor,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
self.assertEqual(f.result(), 2)
q.join()
Expand All @@ -120,7 +120,7 @@ def test_execute_task_threads(self):
cores=1,
threads_per_core=1,
executor=self.executor,
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
)
self.assertEqual(f.result(), 2)
q.join()
Expand All @@ -133,7 +133,7 @@ def test_internal_memory(self):
"cores": 1,
"init_function": set_global,
},
interface_class=FluxPythonInterface,
interface_class=FluxPythonSpawner,
) as p:
f = p.submit(get_global)
self.assertFalse(f.done())
Expand All @@ -142,13 +142,13 @@ def test_internal_memory(self):

def test_interface_exception(self):
with self.assertRaises(ValueError):
flux_interface = FluxPythonInterface(
flux_interface = FluxPythonSpawner(
executor=self.executor, oversubscribe=True
)
flux_interface.bootup(command_lst=[])
with self.assertRaises(ValueError):
flux_interface = FluxPythonInterface(executor=self.executor)
flux_interface = FluxPythonSpawner(executor=self.executor)
flux_interface.bootup(command_lst=[], prefix_path="/path/to/conda/env")
with self.assertRaises(ValueError):
flux_interface = FluxPythonInterface(executor=self.executor)
flux_interface = FluxPythonSpawner(executor=self.executor)
flux_interface.bootup(command_lst=[], prefix_name="conda_env_name")
Loading
Loading