Skip to content

Commit

Permalink
Merge pull request #267 from pyiron/broker_class
Browse files Browse the repository at this point in the history
Broker class
  • Loading branch information
jan-janssen authored Feb 19, 2024
2 parents a1feb9f + 8efdaac commit 344068c
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 431 deletions.
95 changes: 21 additions & 74 deletions pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
import flux.job

from pympipool.shared.executorbase import (
cloudpickle_register,
ExecutorBase,
executor_broker,
ExecutorBroker,
execute_parallel_tasks,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread


class PyFluxExecutor(ExecutorBase):
class PyFluxExecutor(ExecutorBroker):
"""
The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system
allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of
Expand Down Expand Up @@ -69,76 +67,25 @@ def __init__(
):
super().__init__()
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"hostname_localhost": hostname_localhost,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"init_function": init_function,
"cwd": cwd,
"executor": executor,
},
)
)


class PyFluxSingleTaskExecutor(ExecutorBase):
"""
The pympipool.flux.PyFluxSingleTaskExecutor is the internal worker for the pympipool.flux.PyFluxExecutor.
Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
"""

def __init__(
self,
cores=1,
threads_per_core=1,
gpus_per_task=0,
init_function=None,
cwd=None,
executor=None,
hostname_localhost=False,
):
super().__init__()
cloudpickle_register(ind=3)
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
)
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": FluxPythonInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
},
)
for _ in range(max_workers)
],
)


Expand Down
85 changes: 19 additions & 66 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from pympipool.shared.executorbase import (
cloudpickle_register,
execute_parallel_tasks,
ExecutorBase,
executor_broker,
ExecutorBroker,
)
from pympipool.shared.interface import MpiExecInterface
from pympipool.shared.thread import RaisingThread


class PyMPIExecutor(ExecutorBase):
class PyMPIExecutor(ExecutorBroker):
"""
The pympipool.mpi.PyMPIExecutor leverages the message passing interface MPI to distribute python tasks within an
MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIExecutor can be executed
Expand Down Expand Up @@ -62,66 +60,21 @@ def __init__(
):
super().__init__()
self._set_process(
process=RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"executor_class": PyMPISingleTaskExecutor,
"hostname_localhost": hostname_localhost,
# Executor Arguments
"cores": cores_per_worker,
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
},
),
)


class PyMPISingleTaskExecutor(ExecutorBase):
"""
The pympipool.mpi.PyMPISingleTaskExecutor is the internal worker for the pympipool.mpi.PyMPIExecutor.
Args:
cores (int): defines the number of MPI ranks to use for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
"""

def __init__(
self,
cores=1,
oversubscribe=False,
init_function=None,
cwd=None,
hostname_localhost=False,
):
super().__init__()
cloudpickle_register(ind=3)
self._set_process(
process=RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores,
"interface_class": MpiExecInterface,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
"hostname_localhost": hostname_localhost,
},
)
process=[
RaisingThread(
target=execute_parallel_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": MpiExecInterface,
"hostname_localhost": hostname_localhost,
"init_function": init_function,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
},
)
for _ in range(max_workers)
],
)
88 changes: 39 additions & 49 deletions pympipool/shared/executorbase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from concurrent.futures import (
as_completed,
Executor as FutureExecutor,
Future,
)
Expand All @@ -15,6 +14,7 @@

class ExecutorBase(FutureExecutor):
def __init__(self):
cloudpickle_register(ind=3)
self._future_queue = queue.Queue()
self._process = None

Expand Down Expand Up @@ -52,12 +52,16 @@ def shutdown(self, wait=True, *, cancel_futures=False):
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
if wait and self._process is not None:
self._process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process):
self._process = process
self._process.start()

def __len__(self):
return self._future_queue.qsize()

Expand All @@ -72,6 +76,39 @@ def _set_process(self, process):
self._process.start()


class ExecutorBroker(ExecutorBase):
def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
parallel_executors have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
if self._process is not None:
for _ in range(len(self._process)):
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
for process in self._process:
process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process):
self._process = process
for process in self._process:
process.start()


def cancel_items_in_queue(que):
"""
Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future
Expand Down Expand Up @@ -172,43 +209,6 @@ def execute_parallel_tasks_loop(interface, future_queue, init_function=None):
future_queue.task_done()


def executor_broker(
future_queue,
max_workers,
executor_class,
**kwargs,
):
meta_future_lst = _get_executor_dict(
max_workers=max_workers,
executor_class=executor_class,
**kwargs,
)
while True:
if execute_task_dict(
task_dict=future_queue.get(), meta_future_lst=meta_future_lst
):
future_queue.task_done()
else:
future_queue.task_done()
future_queue.join()
break


def execute_task_dict(task_dict, meta_future_lst):
if "fn" in task_dict.keys() or "future" in task_dict.keys():
meta_future = next(as_completed(meta_future_lst.keys()))
executor = meta_future_lst.pop(meta_future)
executor.future_queue.put(task_dict)
meta_future_lst[task_dict["future"]] = executor
return True
elif "shutdown" in task_dict.keys() and task_dict["shutdown"]:
for executor in meta_future_lst.values():
executor.shutdown(wait=task_dict["wait"])
return False
else:
raise ValueError("Unrecognized Task in task_dict: ", task_dict)


def _get_backend_path(cores):
command_lst = [sys.executable]
if cores > 1:
Expand All @@ -220,13 +220,3 @@ def _get_backend_path(cores):

def _get_command_path(executable):
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))


def _get_executor_dict(max_workers, executor_class, **kwargs):
return {_get_future_done(): executor_class(**kwargs) for _ in range(max_workers)}


def _get_future_done():
f = Future()
f.set_result(True)
return f
Loading

0 comments on commit 344068c

Please sign in to comment.