Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker class #267

Merged
merged 13 commits into from
Feb 19, 2024
95 changes: 21 additions & 74 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
import flux.job

from pympipool.shared.executorbase import (
cloudpickle_register,
ExecutorBase,
executor_broker,
ExecutorBroker,
execute_parallel_tasks,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread


class PyFluxExecutor(ExecutorBase):
class PyFluxExecutor(ExecutorBroker):
"""
The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system
allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of
Expand Down Expand Up @@ -69,76 +67,25 @@ def __init__(
):
super().__init__()
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"init_function": init_function,
"cwd": cwd,
"executor": executor,
},
)
)


class PyFluxSingleTaskExecutor(ExecutorBase):
"""
The pympipool.flux.PyFluxSingleTaskExecutor is the internal worker for the pympipool.flux.PyFluxExecutor.

Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true

"""

def __init__(
self,
cores=1,
threads_per_core=1,
gpus_per_task=0,
init_function=None,
cwd=None,
executor=None,
hostname_localhost=False,
):
super().__init__()
cloudpickle_register(ind=3)
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
)
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
},
)
for _ in range(max_workers)
],
)


Expand Down
85 changes: 19 additions & 66 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from pympipool.shared.executorbase import (
cloudpickle_register,
execute_parallel_tasks,
ExecutorBase,
executor_broker,
ExecutorBroker,
)
from pympipool.shared.interface import MpiExecInterface
from pympipool.shared.thread import RaisingThread


class PyMPIExecutor(ExecutorBase):
class PyMPIExecutor(ExecutorBroker):
"""
The pympipool.mpi.PyMPIExecutor leverages the message passing interface MPI to distribute python tasks within an
MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIExecutor can be executed
Expand Down Expand Up @@ -62,66 +60,21 @@ def __init__(
):
super().__init__()
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
"cores": cores_per_worker,
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
),
)


class PyMPISingleTaskExecutor(ExecutorBase):
"""
The pympipool.mpi.PyMPISingleTaskExecutor is the internal worker for the pympipool.mpi.PyMPIExecutor.

Args:
cores (int): defines the number of MPI ranks to use for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true

"""

def __init__(
self,
cores=1,
oversubscribe=False,
init_function=None,
cwd=None,
hostname_localhost=False,
):
super().__init__()
cloudpickle_register(ind=3)
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": MpiExecInterface,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": MpiExecInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
},
)
for _ in range(max_workers)
],
)
88 changes: 39 additions & 49 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from concurrent.futures import (
as_completed,
Executor as FutureExecutor,
Future,
)
Expand All @@ -15,6 +14,7 @@

class ExecutorBase(FutureExecutor):
def __init__(self):
cloudpickle_register(ind=3)
self._future_queue = queue.Queue()
self._process = None

Expand Down Expand Up @@ -52,12 +52,16 @@ def shutdown(self, wait=True, *, cancel_futures=False):
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
if wait and self._process is not None:
self._process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process):
self._process = process
self._process.start()

def __len__(self):
return self._future_queue.qsize()

Expand All @@ -72,6 +76,39 @@ def _set_process(self, process):
self._process.start()


class ExecutorBroker(ExecutorBase):
def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other
methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
if self._process is not None:
for _ in range(len(self._process)):
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
for process in self._process:
process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process):
self._process = process
for process in self._process:
process.start()


def cancel_items_in_queue(que):
"""
Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future
Expand Down Expand Up @@ -172,43 +209,6 @@ def execute_parallel_tasks_loop(interface, future_queue, init_function=None):
future_queue.task_done()


def executor_broker(
future_queue,
max_workers,
executor_class,
**kwargs,
):
meta_future_lst = _get_executor_dict(
max_workers=max_workers,
executor_class=executor_class,
**kwargs,
)
while True:
if execute_task_dict(
task_dict=future_queue.get(), meta_future_lst=meta_future_lst
):
future_queue.task_done()
else:
future_queue.task_done()
future_queue.join()
break


def execute_task_dict(task_dict, meta_future_lst):
if "fn" in task_dict.keys() or "future" in task_dict.keys():
meta_future = next(as_completed(meta_future_lst.keys()))
executor = meta_future_lst.pop(meta_future)
executor.future_queue.put(task_dict)
meta_future_lst[task_dict["future"]] = executor
return True
elif "shutdown" in task_dict.keys() and task_dict["shutdown"]:
for executor in meta_future_lst.values():
executor.shutdown(wait=task_dict["wait"])
return False
else:
raise ValueError("Unrecognized Task in task_dict: ", task_dict)


def _get_backend_path(cores):
command_lst = [sys.executable]
if cores > 1:
Expand All @@ -220,13 +220,3 @@ def _get_backend_path(cores):

def _get_command_path(executable):
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))


def _get_executor_dict(max_workers, executor_class, **kwargs):
return {_get_future_done(): executor_class(**kwargs) for _ in range(max_workers)}


def _get_future_done():
f = Future()
f.set_result(True)
return f
Loading
Loading