Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/set exctr to ensemble libeinfo #1078

Merged
merged 4 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions libensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml

from libensemble import logger
from libensemble.executors import Executor
from libensemble.libE import libE
from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs
from libensemble.tools import add_unique_random_streams
Expand Down Expand Up @@ -215,18 +216,23 @@ class Ensemble:

Tell libEnsemble when to stop a run

persis_info: :obj:`dict`, optional
libE_specs: :obj:`dict` or :class:`LibeSpecs<libensemble.specs.libeSpecs>`, optional

Persistent information to be passed between user function instances
:doc:`(example)<data_structures/persis_info>`
Specifications for libEnsemble

alloc_specs: :obj:`dict` or :class:`AllocSpecs<libensemble.specs.AllocSpecs>`, optional

Specifications for the allocation function

libE_specs: :obj:`dict` or :class:`LibeSpecs<libensemble.specs.libeSpecs>`, optional

Specifications for libEnsemble
persis_info: :obj:`dict`, optional

Persistent information to be passed between user function instances
:doc:`(example)<data_structures/persis_info>`

executor: :class:`Executor<libensemble.executors.executor.executor>`, optional

libEnsemble Executor instance for use within simulation or generator functions

H0: `NumPy structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_, optional

Expand All @@ -248,6 +254,7 @@ def __init__(
libE_specs: Optional[LibeSpecs] = None,
alloc_specs: Optional[AllocSpecs] = AllocSpecs(),
persis_info: Optional[dict] = {},
executor: Optional[Executor] = None,
H0: Optional[npt.NDArray] = None,
parse_args: Optional[bool] = False,
):
Expand All @@ -257,6 +264,7 @@ def __init__(
self._libE_specs = libE_specs
self.alloc_specs = alloc_specs
self.persis_info = persis_info
self.executor = executor
self.H0 = H0

self._util_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -313,6 +321,9 @@ def libE_specs(self, new_specs):
else:
self._libE_specs.__dict__.update(**new_specs)

def _refresh_executor(self):
Executor.executor = self.executor or Executor.executor

def run(self) -> (npt.NDArray, dict, int):
"""
Initializes libEnsemble.
Expand Down Expand Up @@ -352,6 +363,8 @@ def run(self) -> (npt.NDArray, dict, int):
3 = Current process is not in libEnsemble MPI communicator
"""

self._refresh_executor()

self.H, self.persis_info, self.flag = libE(
self.sim_specs,
self.gen_specs,
Expand Down
5 changes: 2 additions & 3 deletions libensemble/sim_funcs/executor_hworld.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import numpy as np

from libensemble.executors.mpi_executor import MPIExecutor
from libensemble.message_numbers import (
MAN_SIGNAL_FINISH,
TASK_FAILED,
Expand Down Expand Up @@ -66,9 +65,9 @@ def custom_polling_loop(exctr, task, timeout_sec=5.0, delay=0.3):
return task, calc_status


def executor_hworld(H, _, sim_specs):
def executor_hworld(H, _, sim_specs, info):
"""Tests launching and polling task and exiting on task finish"""
exctr = MPIExecutor.executor
exctr = info["executor"]
cores = sim_specs["user"]["cores"]
ELAPSED_TIMEOUT = "elapsed_timeout" in sim_specs["user"]

Expand Down
13 changes: 6 additions & 7 deletions libensemble/sim_funcs/var_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import numpy as np

from libensemble.executors.executor import Executor
from libensemble.message_numbers import TASK_FAILED, UNSET_TAG, WORKER_DONE
from libensemble.resources.resources import Resources
from libensemble.sim_funcs.six_hump_camel import six_hump_camel_func
Expand All @@ -50,7 +49,7 @@ def gpu_variable_resources(H, persis_info, sim_specs, libE_info):
dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"]

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down Expand Up @@ -87,7 +86,7 @@ def gpu_variable_resources_from_gen(H, persis_info, sim_specs, libE_info):
dry_run = sim_specs["user"].get("dry_run", False) # logs run lines instead of running
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down Expand Up @@ -147,7 +146,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info):
env_script_path = sim_specs["user"]["env_script"] # Script to run in subprocess
inpt = " ".join(map(str, x)) # Application input

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via given MPI runner, using assigned resources.
_launch_with_env_and_mpi(exctr, inpt, dry_run, env_script_path, "openmpi")
Expand Down Expand Up @@ -181,7 +180,7 @@ def gpu_variable_resources_subenv(H, persis_info, sim_specs, libE_info):
return H_o, persis_info, calc_status


def multi_points_with_variable_resources(H, _, sim_specs):
def multi_points_with_variable_resources(H, _, sim_specs, libE_info):
"""
Evaluates either helloworld or six hump camel for a collection of points
given in ``H["x"]`` via the MPI executor, supporting variable sized
Expand All @@ -204,7 +203,7 @@ def multi_points_with_variable_resources(H, _, sim_specs):
set_cores_by_rsets = True # If True use rset count to set num procs, else use all available to this worker.
core_multiplier = 1 # Only used with set_cores_by_rsets as a multiplier.

exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor
task_states = []
for i, x in enumerate(H["x"]):
nprocs = None # Will be as if argument is not present
Expand Down Expand Up @@ -288,7 +287,7 @@ def CUDA_variable_resources(H, _, sim_specs, libE_info):

# Create application input file
inpt = " ".join(map(str, x))
exctr = Executor.executor # Get Executor
exctr = libE_info["executor"] # Get Executor

# Launch application via system MPI runner, using assigned resources.
task = exctr.submit(
Expand Down
1 change: 1 addition & 0 deletions libensemble/tools/persistent_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def send(self, output: npt.NDArray, calc_status: int = UNSET_TAG, keep_state=Fal
# Need to make copy before remove comm as original could be reused
libE_info = dict(self.libE_info)
libE_info.pop("comm")
libE_info.pop("executor")
else:
libE_info = self.libE_info

Expand Down
4 changes: 4 additions & 0 deletions libensemble/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def _handle(self, Work: dict) -> dict:
libE_info["comm"] = self.comm
libE_info["workerID"] = self.workerID
libE_info["rset_team"] = libE_info.get("rset_team", [])
libE_info["executor"] = Executor.executor
Worker._set_rset_team(libE_info)

calc_out, persis_info, calc_status = self._handle_calc(Work, calc_in)
Expand All @@ -356,6 +357,9 @@ def _handle(self, Work: dict) -> dict:
if "comm" in libE_info:
del libE_info["comm"]

if "executor" in libE_info:
del libE_info["executor"]

# If there was a finish signal, bail
if calc_status == MAN_SIGNAL_FINISH:
return None
Expand Down
Loading