diff --git a/libensemble/ensemble.py b/libensemble/ensemble.py index 165ec7aed..173edd609 100644 --- a/libensemble/ensemble.py +++ b/libensemble/ensemble.py @@ -8,6 +8,7 @@ import yaml from libensemble import logger +from libensemble.executors import Executor from libensemble.libE import libE from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs from libensemble.tools import add_unique_random_streams @@ -215,18 +216,23 @@ class Ensemble: Tell libEnsemble when to stop a run - persis_info: :obj:`dict`, optional + libE_specs: :obj:`dict` or :class:`LibeSpecs`, optional - Persistent information to be passed between user function instances - :doc:`(example)` + Specifications for libEnsemble alloc_specs: :obj:`dict` or :class:`AllocSpecs`, optional Specifications for the allocation function - libE_specs: :obj:`dict` or :class:`LibeSpecs`, optional - Specifications for libEnsemble + persis_info: :obj:`dict`, optional + + Persistent information to be passed between user function instances + :doc:`(example)` + + executor: :class:`Executor`, optional + + libEnsemble Executor instance for use within simulation or generator functions H0: `NumPy structured array `_, optional @@ -248,6 +254,7 @@ def __init__( libE_specs: Optional[LibeSpecs] = None, alloc_specs: Optional[AllocSpecs] = AllocSpecs(), persis_info: Optional[dict] = {}, + executor: Optional[Executor] = None, H0: Optional[npt.NDArray] = None, parse_args: Optional[bool] = False, ): @@ -257,6 +264,7 @@ def __init__( self._libE_specs = libE_specs self.alloc_specs = alloc_specs self.persis_info = persis_info + self.executor = executor self.H0 = H0 self._util_logger = logging.getLogger(__name__) @@ -313,6 +321,9 @@ def libE_specs(self, new_specs): else: self._libE_specs.__dict__.update(**new_specs) + def _refresh_executor(self): + Executor.executor = self.executor or Executor.executor + def run(self) -> (npt.NDArray, dict, int): """ Initializes libEnsemble. @@ -352,6 +363,8 @@ def run(self) -> (npt.NDArray, dict, int): 3 = Current process is not in libEnsemble MPI communicator """ + self._refresh_executor() + self.H, self.persis_info, self.flag = libE( self.sim_specs, self.gen_specs, diff --git a/libensemble/sim_funcs/executor_hworld.py b/libensemble/sim_funcs/executor_hworld.py index 3bb0e1ec3..f7f7d3cc7 100644 --- a/libensemble/sim_funcs/executor_hworld.py +++ b/libensemble/sim_funcs/executor_hworld.py @@ -1,6 +1,5 @@ import numpy as np -from libensemble.executors.mpi_executor import MPIExecutor from libensemble.message_numbers import ( MAN_SIGNAL_FINISH, TASK_FAILED, @@ -66,9 +65,9 @@ def custom_polling_loop(exctr, task, timeout_sec=5.0, delay=0.3): return task, calc_status -def executor_hworld(H, _, sim_specs): +def executor_hworld(H, _, sim_specs, info): """Tests launching and polling task and exiting on task finish""" - exctr = MPIExecutor.executor + exctr = info["executor"] cores = sim_specs["user"]["cores"] ELAPSED_TIMEOUT = "elapsed_timeout" in sim_specs["user"] diff --git a/libensemble/sim_funcs/var_resources.py b/libensemble/sim_funcs/var_resources.py index 726c72b16..2f222a94e 100644 --- a/libensemble/sim_funcs/var_resources.py +++ b/libensemble/sim_funcs/var_resources.py @@ -27,7 +27,6 @@ import numpy as np -from libensemble.executors.executor import Executor from libensemble.message_numbers import TASK_FAILED, UNSET_TAG, WORKER_DONE from libensemble.resources.resources import Resources from libensemble.sim_funcs.six_hump_camel import six_hump_camel_func @@ -50,7 +49,7 @@ def gpu_variable_resources(H, persis_info, sim_specs, libE_info): dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running inpt = " ".join(map(str, x)) # Application input - exctr = Executor.executor # Get Executor + exctr = libE_info["executor"] # Launch application via system MPI runner, using assigned resources. task = exctr.submit( @@ -87,7 +86,7 @@ def gpu_variable_resources_from_gen(H, persis_info, sim_specs, libE_info): dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running inpt = " ".join(map(str, x)) # Application input - exctr = Executor.executor # Get Executor + exctr = libE_info["executor"] # Get Executor # Launch application via system MPI runner, using assigned resources. task = exctr.submit( @@ -147,7 +146,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info): env_script_path = sim_specs["user"]["env_script"] # Script to run in subprocess inpt = " ".join(map(str, x)) # Application input - exctr = Executor.executor # Get Executor + exctr = libE_info["executor"] # Get Executor # Launch application via given MPI runner, using assigned resources. _launch_with_env_and_mpi(exctr, inpt, dry_run, env_script_path, "openmpi") @@ -181,7 +180,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info): return H_o, persis_info, calc_status -def multi_points_with_variable_resources(H, _, sim_specs): +def multi_points_with_variable_resources(H, _, sim_specs, libE_info): """ Evaluates either helloworld or six hump camel for a collection of points given in ``H["x"]`` via the MPI executor, supporting variable sized @@ -204,7 +203,7 @@ def multi_points_with_variable_resources(H, _, sim_specs): set_cores_by_rsets = True # If True use rset count to set num procs, else use all available to this worker. core_multiplier = 1 # Only used with set_cores_by_rsets as a multiplier. - exctr = Executor.executor # Get Executor + exctr = libE_info["executor"] # Get Executor task_states = [] for i, x in enumerate(H["x"]): nprocs = None # Will be as if argument is not present @@ -288,7 +287,7 @@ def CUDA_variable_resources(H, _, sim_specs, libE_info): # Create application input file inpt = " ".join(map(str, x)) - exctr = Executor.executor # Get Executor + exctr = libE_info["executor"] # Get Executor # Launch application via system MPI runner, using assigned resources. task = exctr.submit( diff --git a/libensemble/tools/persistent_support.py b/libensemble/tools/persistent_support.py index fb52d2087..2dc9639db 100644 --- a/libensemble/tools/persistent_support.py +++ b/libensemble/tools/persistent_support.py @@ -45,6 +45,7 @@ def send(self, output: npt.NDArray, calc_status: int = UNSET_TAG, keep_state=Fal # Need to make copy before remove comm as original could be reused libE_info = dict(self.libE_info) libE_info.pop("comm") + libE_info.pop("executor") else: libE_info = self.libE_info diff --git a/libensemble/worker.py b/libensemble/worker.py index 27dd15225..f724bddc7 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -346,6 +346,7 @@ def _handle(self, Work: dict) -> dict: libE_info["comm"] = self.comm libE_info["workerID"] = self.workerID libE_info["rset_team"] = libE_info.get("rset_team", []) + libE_info["executor"] = Executor.executor Worker._set_rset_team(libE_info) calc_out, persis_info, calc_status = self._handle_calc(Work, calc_in) @@ -356,6 +357,9 @@ def _handle(self, Work: dict) -> dict: if "comm" in libE_info: del libE_info["comm"] + if "executor" in libE_info: + del libE_info["executor"] + # If there was a finish signal, bail if calc_status == MAN_SIGNAL_FINISH: return None