From c8b4280c54519ec490068c66691a4e278f72f1c2 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 23 Oct 2023 18:26:15 -0500 Subject: [PATCH 1/6] Support in-place gen on manager --- .../alloc_funcs/give_sim_work_first.py | 5 +- libensemble/executors/executor.py | 2 +- libensemble/manager.py | 118 ++++-- libensemble/run_calculation.py | 338 ++++++++++++++++++ libensemble/specs.py | 4 + .../test_1d_sampling_gen_on_manager.py | 58 +++ libensemble/tools/alloc_support.py | 3 +- 7 files changed, 495 insertions(+), 33 deletions(-) create mode 100644 libensemble/run_calculation.py create mode 100644 libensemble/tests/regression_tests/test_1d_sampling_gen_on_manager.py diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index a7aa74d3b2..47c95c36f4 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -84,7 +84,10 @@ def give_sim_work_first( # Give gen work return_rows = range(len(H)) if gen_in else [] try: - Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid)) + if libE_info.get("gen_on_manager", False): + Work[0] = support.gen_work(0, gen_in, return_rows, persis_info.get(0)) + else: + Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid)) except InsufficientFreeResources: break gen_count += 1 diff --git a/libensemble/executors/executor.py b/libensemble/executors/executor.py index 35a321767f..c04c0760ae 100644 --- a/libensemble/executors/executor.py +++ b/libensemble/executors/executor.py @@ -658,7 +658,7 @@ def set_workerID(self, workerid) -> None: """Sets the worker ID for this executor""" self.workerID = workerid - def set_worker_info(self, comm, workerid=None) -> None: + def set_worker_info(self, comm=None, workerid=None) -> None: """Sets info for this executor""" self.workerID = workerid self.comm = comm diff --git a/libensemble/manager.py b/libensemble/manager.py index 5a495887e2..b3e4f16048 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -7,6 +7,7 @@ import glob import logging import os +from pathlib import Path import platform import socket import sys @@ -32,13 +33,16 @@ calc_type_strings, ) from libensemble.resources.resources import Resources +from libensemble.run_calculation import RunCalc from libensemble.tools.fields_keys import protected_libE_fields from libensemble.tools.tools import _PERSIS_RETURN_WARNING, _USER_CALC_DIR_WARNING +from libensemble.utils.loc_stack import LocationStack from libensemble.utils.misc import extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer from libensemble.worker import WorkerErrMsg + logger = logging.getLogger(__name__) # For debug messages - uncomment # logger.setLevel(logging.DEBUG) @@ -65,6 +69,15 @@ def report_worker_exc(wrk_exc: Exception = None) -> None: logger.error(exc) +# TODO - do we need persis_in here also +def get_dtypes(H, sim_specs, gen_specs): + dtypes = { + EVAL_SIM_TAG: repack_fields(H[sim_specs["in"]]).dtype, + EVAL_GEN_TAG: repack_fields(H[gen_specs["in"]]).dtype, + } + return dtypes + + def manager_main( hist: npt.NDArray, libE_specs: dict, @@ -112,10 +125,7 @@ def manager_main( gen_specs["in"] = [] # Send dtypes to workers - dtypes = { - EVAL_SIM_TAG: repack_fields(hist.H[sim_specs["in"]]).dtype, - EVAL_GEN_TAG: repack_fields(hist.H[gen_specs["in"]]).dtype, - } + dtypes = get_dtypes(hist.H, sim_specs, gen_specs) for wcomm in wcomms: wcomm.send(0, dtypes) @@ -198,6 +208,14 @@ def __init__( self.use_resource_sets = dyn_keys_in_H or self.libE_specs.get("num_resource_sets") self.gen_num_procs = libE_specs.get("gen_num_procs", 0) self.gen_num_gpus = libE_specs.get("gen_num_gpus", 0) + self.gen_on_manager = libE_specs.get("gen_on_manager", False) + + if self.gen_on_manager: + dtypes = get_dtypes(self.hist.H, sim_specs, gen_specs) + # TODO rename from gen_worker + self.gen_worker = RunCalc(dtypes, 0, sim_specs, gen_specs, libE_specs) + self.LS = LocationStack() + self.LS.register_loc("workflow", Path(libE_specs.get("workflow_dir_path"))) self.W = np.zeros(len(self.wcomms), dtype=Manager.worker_dtype) self.W["worker_id"] = np.arange(len(self.wcomms)) + 1 @@ -301,17 +319,23 @@ def _save_every_k_gens(self) -> None: def _check_work_order(self, Work: dict, w: int, force: bool = False) -> None: """Checks validity of an allocation function order""" - assert w != 0, "Can't send to worker 0; this is the manager." - if self.W[w - 1]["active_recv"]: - assert "active_recv" in Work["libE_info"], ( - "Messages to a worker in active_recv mode should have active_recv" - f"set to True in libE_info. Work['libE_info'] is {Work['libE_info']}" - ) - else: - if not force: - assert self.W[w - 1]["active"] == 0, ( - "Allocation function requested work be sent to worker %d, an already active worker." % w + + if not self.gen_on_manager: + assert w != 0, "Can't send to worker 0; this is the manager." + + # gen on manager (run in place) is not persistent or active_recv and cannot be active already + if w != 0: + if self.W[w - 1]["active_recv"]: + assert "active_recv" in Work["libE_info"], ( + "Messages to a worker in active_recv mode should have active_recv" + f"set to True in libE_info. Work['libE_info'] is {Work['libE_info']}" ) + else: + if not force: + assert self.W[w - 1]["active"] == 0, ( + "Allocation function requested work be sent to worker %d, an already active worker." % w + ) + work_rows = Work["libE_info"]["H_rows"] if len(work_rows): work_fields = set(Work["H_fields"]) @@ -347,14 +371,33 @@ def _freeup_resources(self, w: int) -> None: if self.resources: self.resources.resource_manager.free_rsets(w) - def _send_work_order(self, Work: dict, w: int) -> None: + def _run_calc_in_place(self, Work, w, persis_info, H_to_be_sent=None): + """Run user function in place""" + + with self.LS.loc("workflow"): + D_recv = self.gen_worker.run(Work, H_to_be_sent) + + # TODO - check: skipping _handle_msg_from_worker + # TODO - persis_info needs to be passed through (via _send_work_order) - alt to make self.persis_info + # or use persis_info for the worker which is in Work see: "persis_info": persis_info[w], + # but we may want to be able to change persis_info direct keys when run on manager + self._update_state_on_worker_msg(persis_info, D_recv, w) + + # TODO Either adding persis_info here - or have alternative to _send_work_order altogether. + def _send_work_order(self, Work: dict, w: int, persis_info: dict) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") + if self.gen_on_manager and Work["tag"] == EVAL_GEN_TAG: + run_on_gen = True + else: + run_on_gen = False + if self.resources: self._set_resources(Work, w) - self.wcomms[w - 1].send(Work["tag"], Work) + if not run_on_gen: + self.wcomms[w - 1].send(Work["tag"], Work) if Work["tag"] == EVAL_GEN_TAG: self.W[w - 1]["gen_started_time"] = time.time() @@ -366,19 +409,30 @@ def _send_work_order(self, Work: dict, w: int) -> None: new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): + # TODO: if gen on manager may not need repack_fields H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) - self.wcomms[w - 1].send(0, H_to_be_sent) + + if not run_on_gen: + self.wcomms[w - 1].send(0, H_to_be_sent) + else: + H_to_be_sent = None + + if run_on_gen: + self._run_calc_in_place(Work, w, persis_info, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): """Updates a workers' active/idle status following an allocation order""" - self.W[w - 1]["active"] = Work["tag"] - if "libE_info" in Work: - if "persistent" in Work["libE_info"]: - self.W[w - 1]["persis_state"] = Work["tag"] - if Work["libE_info"].get("active_recv", False): - self.W[w - 1]["active_recv"] = Work["tag"] - else: - assert "active_recv" not in Work["libE_info"], "active_recv worker must also be persistent" + + # if this is self.gen_on_manager + if w != 0: + self.W[w - 1]["active"] = Work["tag"] + if "libE_info" in Work: + if "persistent" in Work["libE_info"]: + self.W[w - 1]["persis_state"] = Work["tag"] + if Work["libE_info"].get("active_recv", False): + self.W[w - 1]["active_recv"] = Work["tag"] + else: + assert "active_recv" not in Work["libE_info"], "active_recv worker must also be persistent" work_rows = Work["libE_info"]["H_rows"] if Work["tag"] == EVAL_SIM_TAG: @@ -536,7 +590,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): "libE_info": {"persistent": True, "H_rows": rows_to_send}, } self._check_work_order(work, w, force=True) - self._send_work_order(work, w) + self._send_work_order(work, w, persis_info) self.hist.update_history_to_gen(rows_to_send) else: self.wcomms[w - 1].send(PERSIS_STOP, MAN_SIGNAL_KILL) @@ -584,6 +638,7 @@ def _get_alloc_libE_info(self) -> dict: "use_resource_sets": self.use_resource_sets, "gen_num_procs": self.gen_num_procs, "gen_num_gpus": self.gen_num_gpus, + "gen_on_manager": self.gen_on_manager, } def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: @@ -638,11 +693,14 @@ def run(self, persis_info: dict) -> (dict, int, int): if self._sim_max_given(): break self._check_work_order(Work[w], w) - self._send_work_order(Work[w], w) + self._send_work_order(Work[w], w, persis_info) self._update_state_on_alloc(Work[w], w) - assert self.term_test() or any( - self.W["active"] != 0 - ), "alloc_f did not return any work, although all workers are idle." + + # gen_on_manager completes already so can have all idle at this point + if 0 not in Work: + assert self.term_test() or any( + self.W["active"] != 0 + ), "alloc_f did not return any work, although all workers are idle." except WorkerException as e: report_worker_exc(e) raise LoggedException(e.args[0], e.args[1]) from None diff --git a/libensemble/run_calculation.py b/libensemble/run_calculation.py new file mode 100644 index 0000000000..d9a7190f4a --- /dev/null +++ b/libensemble/run_calculation.py @@ -0,0 +1,338 @@ +# TODO where to place main dir or utils/ or other +# TODO consolidate with worker.py (e.g. inheritence) + +""" +libEnsemble RunCalc class +==================================================== +""" + +import cProfile +import logging +import logging.handlers +import socket +from itertools import count +from pathlib import Path +from traceback import format_exc +from traceback import format_exception_only as format_exc_msg + +import numpy as np +import numpy.typing as npt + +from libensemble.comms.logs import LogConfig, worker_logging_config +from libensemble.executors.executor import Executor +from libensemble.message_numbers import ( + CALC_EXCEPTION, + EVAL_GEN_TAG, + EVAL_SIM_TAG, + MAN_SIGNAL_FINISH, + MAN_SIGNAL_KILL, + PERSIS_STOP, + STOP_TAG, + UNSET_TAG, + calc_status_strings, + calc_type_strings, +) +from libensemble.resources.resources import Resources +from libensemble.utils.loc_stack import LocationStack +from libensemble.utils.misc import extract_H_ranges +from libensemble.utils.output_directory import EnsembleDirectory +from libensemble.utils.runners import Runners +from libensemble.utils.timer import Timer + +logger = logging.getLogger(__name__) +# To change logging level for just this module +# logger.setLevel(logging.DEBUG) + +###################################################################### +# Worker Class +###################################################################### + + +class WorkerErrMsg: + def __init__(self, msg, exc): + self.msg = msg + self.exc = exc + + +class RunCalc: + + """The worker class provides methods for controlling sim and gen funcs + + **Object Attributes:** + + These are public object attributes. + + :ivar comm communicator: + Comm object for manager communications + + :ivar dict dtypes: + Dictionary containing type information for sim and gen inputs + + :ivar int workerID: + The libensemble Worker ID + + :ivar dict sim_specs: + Parameters/information for simulation calculations + + :ivar dict calc_iter: + Dictionary containing counts for each type of calc (e.g. sim or gen) + """ + + def __init__( + self, + dtypes: npt.DTypeLike, + workerID: int, + sim_specs: dict, + gen_specs: dict, + libE_specs: dict, + ) -> None: # noqa: F821 + """Initializes new worker object""" + self.dtypes = dtypes + self.workerID = workerID + self.libE_specs = libE_specs + self.stats_fmt = libE_specs.get("stats_fmt", {}) + + self.calc_iter = {EVAL_SIM_TAG: 0, EVAL_GEN_TAG: 0} + self.runners = Runners(sim_specs, gen_specs) + self._run_calc = self.runners.make_runners() + + # TODO Executor pass / resources pass required. + # Worker._set_executor(self.workerID, self.comm) + # Worker._set_resources(self.workerID, self.comm) + + self.EnsembleDirectory = EnsembleDirectory(libE_specs=libE_specs) + + # If resources / executor passed in, then use those. + # if resources is not None: + # Resources.resources = resources + # if executor is not None: + # Executor.executor = executor + + # Receive workflow dir from manager + # if libE_specs.get("use_workflow_dir"): + # _, libE_specs["workflow_dir_path"] = comm.recv() + + # workerID = workerID or comm.rank + + # Initialize logging on comms + # if log_comm: + # worker_logging_config(comm, workerID) + + @staticmethod + def _set_gen_procs_gpus(libE_info, obj): + if any(k in libE_info for k in ("num_procs", "num_gpus")): + obj.set_gen_procs_gpus(libE_info) + + @staticmethod + def _set_rset_team(libE_info: dict) -> bool: + """Pass new rset_team to worker resources + + Also passes gen assigned cpus/gpus to resources and executor + """ + resources = Resources.resources + exctr = Executor.executor + if isinstance(resources, Resources): + wresources = resources.worker_resources + wresources.set_rset_team(libE_info["rset_team"]) + Worker._set_gen_procs_gpus(libE_info, wresources) + if isinstance(exctr, Executor): + Worker._set_gen_procs_gpus(libE_info, exctr) + return True + else: + return False + + @staticmethod + def _set_executor(workerID: int, comm: "communicator") -> bool: # noqa: F821 + """Sets worker ID in the executor, return True if set""" + exctr = Executor.executor + if isinstance(exctr, Executor): + exctr.set_worker_info(comm, workerID) # When merge update + return True + else: + logger.debug(f"No executor set on worker {workerID}") + return False + + @staticmethod + def _set_resources(workerID, comm: "communicator") -> bool: # noqa: F821 + """Sets worker ID in the resources, return True if set""" + resources = Resources.resources + if isinstance(resources, Resources): + # tmp + print(f"{type(comm)}", flush=True) + + resources.set_worker_resources(comm.get_num_workers(), workerID) + return True + else: + logger.debug(f"No resources set on worker {workerID}") + return False + + def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, int): + """Runs a calculation on this worker object. + + This routine calls the user calculations. Exceptions are caught, + dumped to the summary file, and raised. + + Parameters + ---------- + + Work: :obj:`dict` + :ref:`(example)` + + calc_in: ``numpy structured array`` + Rows from the :ref:`history array` + for processing + """ + calc_type = Work["tag"] + self.calc_iter[calc_type] += 1 + + # calc_stats stores timing and summary info for this Calc (sim or gen) + # calc_id = next(self._calc_id_counter) + + if calc_type == EVAL_SIM_TAG: + enum_desc = "sim_id" + calc_id = extract_H_ranges(Work) + else: + enum_desc = "Gen no" + # Use global gen count if available + if Work["libE_info"].get("gen_count"): + calc_id = str(Work["libE_info"]["gen_count"]) + else: + calc_id = str(self.calc_iter[calc_type]) + # Add a right adjust (minimum width). + calc_id = calc_id.rjust(5, " ") + + timer = Timer() + + try: + logger.debug(f"Starting {enum_desc}: {calc_id}") + calc = self._run_calc[calc_type] + with timer: # check works when making gen dirs (and when there is a workflow dir). Matrix of checks + if self.EnsembleDirectory.use_calc_dirs(calc_type): + loc_stack, calc_dir = self.EnsembleDirectory.prep_calc_dir( + Work, + self.calc_iter, + self.workerID, + calc_type, + ) + with loc_stack.loc(calc_dir): # Changes to calculation directory + out = calc(calc_in, Work) + else: + out = calc(calc_in, Work) + + logger.debug(f"Returned from user function for {enum_desc} {calc_id}") + + calc_status = UNSET_TAG + # Check for buffered receive + # if self.comm.recv_buffer: + # tag, message = self.comm.recv() + # if tag in [STOP_TAG, PERSIS_STOP] and message is MAN_SIGNAL_FINISH: + # calc_status = MAN_SIGNAL_FINISH + + if out: + if len(out) >= 3: # Out, persis_info, calc_status + calc_status = out[2] + return out + elif len(out) == 2: # Out, persis_info OR Out, calc_status + if isinstance(out[1], int) or isinstance(out[1], str): # got Out, calc_status + calc_status = out[1] + return out[0], Work["persis_info"], calc_status + return *out, calc_status # got Out, persis_info + else: + return out, Work["persis_info"], calc_status + else: + return None, Work["persis_info"], calc_status + + except Exception as e: + logger.debug(f"Re-raising exception from calc {e}") + calc_status = CALC_EXCEPTION + raise + finally: + ctype_str = calc_type_strings[calc_type] + status = calc_status_strings.get(calc_status, calc_status) + calc_msg = self._get_calc_msg(enum_desc, calc_id, ctype_str, timer, status) + + # TODO could call stat_logger (needs passing in) or log outside in manager. + # logging.getLogger(LogConfig.config.stats_name).info() + + def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Timer, status: str) -> str: + """Construct line for libE_stats.txt file""" + calc_msg = f"{enum_desc} {calc_id}: {calc_type} {timer}" + + if self.stats_fmt.get("task_timing", False) or self.stats_fmt.get("task_datetime", False): + calc_msg += Executor.executor.new_tasks_timing(datetime=self.stats_fmt.get("task_datetime", False)) + + if self.stats_fmt.get("show_resource_sets", False): + # Maybe just call option resource_sets if already in sub-dictionary + resources = Resources.resources.worker_resources + calc_msg += f" rsets: {resources.rset_team}" + + # Always put status last as could involve different numbers of words. Some scripts may assume this. + calc_msg += f" Status: {status}" + + return calc_msg + + def _handle(self, Work: dict, calc_in) -> dict: + """Handles a work request from the manager""" + # Check work request and receive second message (if needed) + + libE_info = Work["libE_info"] + calc_type = Work["tag"] + + assert calc_type in [EVAL_SIM_TAG, EVAL_GEN_TAG], "calc_type must either be EVAL_SIM_TAG or EVAL_GEN_TAG" + + # Call user function + libE_info["workerID"] = self.workerID + libE_info["rset_team"] = libE_info.get("rset_team", []) + libE_info["executor"] = Executor.executor + + # TODO Executor pass / resources pass required. + # Worker._set_rset_team(libE_info) + + calc_out, persis_info, calc_status = self._handle_calc(Work, calc_in) + + # TODO check these next libE_info changes safe to do on manager? + if "libE_info" in Work: + libE_info = Work["libE_info"] + + if "comm" in libE_info: + del libE_info["comm"] + + if "executor" in libE_info: + del libE_info["executor"] + + # If there was a finish signal, bail + if calc_status == MAN_SIGNAL_FINISH: + return None + + # Otherwise, send a calc result back to manager + logger.debug(f"Sending to Manager with status {calc_status}") + return { + "calc_out": calc_out, + "persis_info": persis_info, + "libE_info": libE_info, + "calc_status": calc_status, + "calc_type": calc_type, + } + + def run(self, Work, calc_in) -> None: + """Runs the main worker loop.""" + try: + logger.info(f"Worker {self.workerID} initiated on node {socket.gethostname()}") + + mtag = Work["tag"] # is this right? + if mtag in [STOP_TAG, PERSIS_STOP]: + return + + logger.debug(f"mtag: {mtag}; Work: {Work}") + + response = self._handle(Work, calc_in) + return response + + # TODO error handling + # except Exception as e: + # self.comm.send(0, WorkerErrMsg(" ".join(format_exc_msg(type(e), e)).strip(), format_exc())) + # else: + # self.comm.kill_pending() + finally: + self.runners.shutdown() + self.EnsembleDirectory.copy_back() diff --git a/libensemble/specs.py b/libensemble/specs.py index 613cb6aadf..3015bc63fb 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -205,6 +205,10 @@ class LibeSpecs(BaseModel): Specifications for configuring libEnsemble's runtime behavior. """ + # TODO Move to best place + gen_on_manager: Optional[bool] = False + """ Whether libEnsemble should run generators on the manager. """ + comms: Optional[str] = "mpi" """ Manager/Worker communications mode. ``'mpi'``, ``'local'``, or ``'tcp'`` """ diff --git a/libensemble/tests/regression_tests/test_1d_sampling_gen_on_manager.py b/libensemble/tests/regression_tests/test_1d_sampling_gen_on_manager.py new file mode 100644 index 0000000000..c0b2c37505 --- /dev/null +++ b/libensemble/tests/regression_tests/test_1d_sampling_gen_on_manager.py @@ -0,0 +1,58 @@ +""" +Runs libEnsemble with Latin hypercube sampling on a simple 1D problem + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_1d_sampling.py + python test_1d_sampling.py --nworkers 3 --comms local + python test_1d_sampling.py --nworkers 3 --comms tcp + +The number of concurrent evaluations of the objective function will be 4-1=3. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local tcp +# TESTSUITE_NPROCS: 2 4 + +import numpy as np + +from libensemble import Ensemble +from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f + +# Import libEnsemble items for this test +from libensemble.sim_funcs.one_d_func import one_d_example as sim_f +from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs +from libensemble.tools import add_unique_random_streams + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + sampling = Ensemble(parse_args=True) + sampling.libE_specs = LibeSpecs( + save_every_k_gens=300, + safe_mode=False, + disable_log_files=False, + gen_on_manager=True, + ) + sampling.sim_specs = SimSpecs( + sim_f=sim_f, + inputs=["x"], + outputs=[("f", float)], + ) + sampling.gen_specs = GenSpecs( + gen_f=gen_f, + outputs=[("x", float, (1,))], + user={ + "gen_batch_size": 10, + "lb": np.array([-3]), + "ub": np.array([3]), + }, + ) + + sampling.persis_info = add_unique_random_streams({}, sampling.nworkers + 1) + sampling.exit_criteria = ExitCriteria(gen_max=21) + + sampling.run() + if sampling.is_manager: + print(f"{sampling.H['f'][:10]}") + assert len(sampling.H) >= 21 + print("\nlibEnsemble with random sampling has generated enough points") + sampling.save_output(__file__) diff --git a/libensemble/tools/alloc_support.py b/libensemble/tools/alloc_support.py index ed11484115..f3f2ba0488 100644 --- a/libensemble/tools/alloc_support.py +++ b/libensemble/tools/alloc_support.py @@ -54,6 +54,7 @@ def __init__( self.sched = None self.def_gen_num_procs = libE_info.get("gen_num_procs", 0) self.def_gen_num_gpus = libE_info.get("gen_num_gpus", 0) + self.gen_on_manager = libE_info.get("gen_on_manager", False) if self.resources is not None: wrk_resources = self.resources.resource_manager scheduler_opts = libE_info.get("scheduler_opts", {}) @@ -272,7 +273,7 @@ def gen_work(self, wid, H_fields, H_rows, persis_info, **libE_info): """ self._update_rset_team(libE_info, wid) - if not self.W[wid - 1]["persis_state"]: + if self.gen_on_manager or not self.W[wid - 1]["persis_state"]: AllocSupport.gen_counter += 1 # Count total gens libE_info["gen_count"] = AllocSupport.gen_counter From 10903b4574e163bc9fbe75e6d0482f87f99f55c2 Mon Sep 17 00:00:00 2001 From: shudson Date: Thu, 26 Oct 2023 16:02:24 -0500 Subject: [PATCH 2/6] Restructured functions for gen on manager --- libensemble/manager.py | 80 ++++++++++++++++++++++------------ libensemble/run_calculation.py | 2 +- 2 files changed, 54 insertions(+), 28 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index b3e4f16048..c6a515c461 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -211,6 +211,7 @@ def __init__( self.gen_on_manager = libE_specs.get("gen_on_manager", False) if self.gen_on_manager: + self.manager_gen_start_time = 0 dtypes = get_dtypes(self.hist.H, sim_specs, gen_specs) # TODO rename from gen_worker self.gen_worker = RunCalc(dtypes, 0, sim_specs, gen_specs, libE_specs) @@ -381,58 +382,75 @@ def _run_calc_in_place(self, Work, w, persis_info, H_to_be_sent=None): # TODO - persis_info needs to be passed through (via _send_work_order) - alt to make self.persis_info # or use persis_info for the worker which is in Work see: "persis_info": persis_info[w], # but we may want to be able to change persis_info direct keys when run on manager - self._update_state_on_worker_msg(persis_info, D_recv, w) + self._update_state_after_local_gen(persis_info, D_recv, w) - # TODO Either adding persis_info here - or have alternative to _send_work_order altogether. def _send_work_order(self, Work: dict, w: int, persis_info: dict) -> None: - """Sends an allocation function order to a worker""" - logger.debug(f"Manager sending work unit to worker {w}") - if self.gen_on_manager and Work["tag"] == EVAL_GEN_TAG: - run_on_gen = True + self._set_manager_work_order(Work, w, persis_info) else: - run_on_gen = False + self._send_work_to_worker(Work, w) + + # TODO: look at naming of functions + def _set_manager_work_order(self, Work: dict, w: int, persis_info: dict) -> None: + logger.debug(f"Manager running generator") if self.resources: self._set_resources(Work, w) - if not run_on_gen: - self.wcomms[w - 1].send(Work["tag"], Work) - - if Work["tag"] == EVAL_GEN_TAG: - self.W[w - 1]["gen_started_time"] = time.time() + self.manager_gen_start_time = time.time() + # TODO functionalize common lines - but for logger.debug message (but that could be templated and sent) work_rows = Work["libE_info"]["H_rows"] work_name = calc_type_strings[Work["tag"]] - logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + # logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") if len(work_rows): new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): # TODO: if gen on manager may not need repack_fields H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) - - if not run_on_gen: - self.wcomms[w - 1].send(0, H_to_be_sent) else: H_to_be_sent = None + self.hist.update_history_to_gen(work_rows) + self._run_calc_in_place(Work, w, persis_info, H_to_be_sent) + + def _send_work_to_worker(self, Work: dict, w: int) -> None: + """Sends an allocation function order to a worker""" + logger.debug(f"Manager sending work unit to worker {w}") + + if self.resources: + self._set_resources(Work, w) + + self.wcomms[w - 1].send(Work["tag"], Work) - if run_on_gen: - self._run_calc_in_place(Work, w, persis_info, H_to_be_sent) + if Work["tag"] == EVAL_GEN_TAG: + self.W[w - 1]["gen_started_time"] = time.time() + + work_rows = Work["libE_info"]["H_rows"] + work_name = calc_type_strings[Work["tag"]] + logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + if len(work_rows): + new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] + H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) + for i, row in enumerate(work_rows): + H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) + self.wcomms[w - 1].send(0, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): """Updates a workers' active/idle status following an allocation order""" # if this is self.gen_on_manager - if w != 0: - self.W[w - 1]["active"] = Work["tag"] - if "libE_info" in Work: - if "persistent" in Work["libE_info"]: - self.W[w - 1]["persis_state"] = Work["tag"] - if Work["libE_info"].get("active_recv", False): - self.W[w - 1]["active_recv"] = Work["tag"] - else: - assert "active_recv" not in Work["libE_info"], "active_recv worker must also be persistent" + if w == 0: + return + + self.W[w - 1]["active"] = Work["tag"] + if "libE_info" in Work: + if "persistent" in Work["libE_info"]: + self.W[w - 1]["persis_state"] = Work["tag"] + if Work["libE_info"].get("active_recv", False): + self.W[w - 1]["active_recv"] = Work["tag"] + else: + assert "active_recv" not in Work["libE_info"], "active_recv worker must also be persistent" work_rows = Work["libE_info"]["H_rows"] if Work["tag"] == EVAL_SIM_TAG: @@ -477,6 +495,14 @@ def _receive_from_workers(self, persis_info: dict) -> dict: self._save_every_k_gens() return persis_info + def _update_state_after_local_gen(self, persis_info: dict, D_recv: dict, w: int) -> None: + # TODO - note w should be zero here always - could just set to 0 or default in function and not send + self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.manager_gen_start_time) + assert len(D_recv["calc_out"]) or np.any( + self.W["active"] + ), "Gen must return work when is is the only thing active." + self._freeup_resources(w) + def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -> None: """Updates history and worker info on worker message""" calc_type = D_recv["calc_type"] diff --git a/libensemble/run_calculation.py b/libensemble/run_calculation.py index d9a7190f4a..07e89a3ab5 100644 --- a/libensemble/run_calculation.py +++ b/libensemble/run_calculation.py @@ -1,5 +1,5 @@ # TODO where to place main dir or utils/ or other -# TODO consolidate with worker.py (e.g. inheritence) +# TODO consolidate with worker.py (e.g. inheritance) """ libEnsemble RunCalc class From 913429297b4ed3a45bc5aed25d2c9d91c30de826 Mon Sep 17 00:00:00 2001 From: shudson Date: Wed, 1 Nov 2023 14:20:43 -0500 Subject: [PATCH 3/6] Only return new points to gen when using gen_on_manager --- libensemble/alloc_funcs/give_sim_work_first.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 47c95c36f4..7dc5bf3205 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -83,6 +83,12 @@ def give_sim_work_first( # Give gen work return_rows = range(len(H)) if gen_in else [] + # SH TODO: May want option to return all points or not, but for now using gen_on_manger to decide. + if libE_info.get("gen_on_manager", False): + # Return only new points to generator + returned_but_not_given = np.logical_and.reduce((H["sim_ended"], ~H["gen_informed"])) if gen_in else [] + if np.any(returned_but_not_given): + return_rows = np.where(returned_but_not_given)[0] try: if libE_info.get("gen_on_manager", False): Work[0] = support.gen_work(0, gen_in, return_rows, persis_info.get(0)) From 3fc067c75a3aadcbc876c54d1056c33f244bdb76 Mon Sep 17 00:00:00 2001 From: shudson Date: Wed, 1 Nov 2023 14:21:40 -0500 Subject: [PATCH 4/6] Support generators as a class --- libensemble/utils/runners.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 07897b9428..498e7265f2 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -21,6 +21,11 @@ def __init__(self, sim_specs: dict, gen_specs: dict) -> None: self.gen_specs = gen_specs self.sim_f = sim_specs["sim_f"] self.gen_f = gen_specs.get("gen_f") + + if inspect.isclass(self.gen_f): + self.gen_obj = self.gen_f(gen_specs) + self.gen_f = self.gen_obj.run + self.has_globus_compute_sim = len(sim_specs.get("globus_compute_endpoint", "")) > 0 self.has_globus_compute_gen = len(gen_specs.get("globus_compute_endpoint", "")) > 0 From cdaf5ef3b6abd384da99b5ba8a4ae895d1bb3764 Mon Sep 17 00:00:00 2001 From: shudson Date: Thu, 2 Nov 2023 18:37:54 -0500 Subject: [PATCH 5/6] Change default rset assigned to -1 --- libensemble/resources/scheduler.py | 2 +- libensemble/resources/worker_resources.py | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/libensemble/resources/scheduler.py b/libensemble/resources/scheduler.py index 04de87e771..386a406bc8 100644 --- a/libensemble/resources/scheduler.py +++ b/libensemble/resources/scheduler.py @@ -245,7 +245,7 @@ def get_avail_rsets_by_group(self): for g in groups: self.avail_rsets_by_group[g] = [] for ind, rset in enumerate(rsets): - if not rset["assigned"]: + if rset["assigned"] == -1: # now default is -1. g = rset["group"] self.avail_rsets_by_group[g].append(ind) return self.avail_rsets_by_group diff --git a/libensemble/resources/worker_resources.py b/libensemble/resources/worker_resources.py index f46afee5d9..2becaa1df3 100644 --- a/libensemble/resources/worker_resources.py +++ b/libensemble/resources/worker_resources.py @@ -50,11 +50,10 @@ def __init__(self, num_workers: int, resources: "GlobalResources") -> None: # n ) self.rsets = np.zeros(self.total_num_rsets, dtype=ResourceManager.man_rset_dtype) - self.rsets["assigned"] = 0 + self.rsets["assigned"] = -1 # Can assign to manager (=0) so make unset value -1 for field in self.all_rsets.dtype.names: self.rsets[field] = self.all_rsets[field] self.num_groups = self.rsets["group"][-1] - self.rsets_free = self.total_num_rsets self.gpu_rsets_free = self.total_num_gpu_rsets self.nongpu_rsets_free = self.total_num_nongpu_rsets @@ -70,7 +69,7 @@ def assign_rsets(self, rset_team, worker_id): if rset_team: rteam = self.rsets["assigned"][rset_team] for i, wid in enumerate(rteam): - if wid == 0: + if wid == -1: self.rsets["assigned"][rset_team[i]] = worker_id self.rsets_free -= 1 if self.rsets["gpus"][rset_team[i]]: @@ -85,13 +84,13 @@ def assign_rsets(self, rset_team, worker_id): def free_rsets(self, worker=None): """Free up assigned resource sets""" if worker is None: - self.rsets["assigned"] = 0 + self.rsets["assigned"] = -1 self.rsets_free = self.total_num_rsets self.gpu_rsets_free = self.total_num_gpu_rsets self.nongpu_rsets_free = self.total_num_nongpu_rsets else: rsets_to_free = np.where(self.rsets["assigned"] == worker)[0] - self.rsets["assigned"][rsets_to_free] = 0 + self.rsets["assigned"][rsets_to_free] = -1 self.rsets_free += len(rsets_to_free) self.gpu_rsets_free += np.count_nonzero(self.rsets["gpus"][rsets_to_free]) self.nongpu_rsets_free += np.count_nonzero(~self.rsets["gpus"][rsets_to_free]) From 1314d5666fe77661463a4765259dbb2e873bb19c Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 6 Nov 2023 09:54:58 -0600 Subject: [PATCH 6/6] Call finalize if function exists --- libensemble/manager.py | 29 +++++++++++++++++++++++------ libensemble/message_numbers.py | 2 ++ libensemble/run_calculation.py | 10 +++++++++- libensemble/utils/runners.py | 18 +++++++++++++++--- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index c6a515c461..03ea2c0dfe 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -390,9 +390,10 @@ def _send_work_order(self, Work: dict, w: int, persis_info: dict) -> None: else: self._send_work_to_worker(Work, w) - # TODO: look at naming of functions + # SH TODO look at naming of functions + # SH TODO on final send - has all persis_info here - not just persis_info[0] - check def _set_manager_work_order(self, Work: dict, w: int, persis_info: dict) -> None: - logger.debug(f"Manager running generator") + logger.debug("Manager running generator") if self.resources: self._set_resources(Work, w) @@ -401,7 +402,7 @@ def _set_manager_work_order(self, Work: dict, w: int, persis_info: dict) -> None # TODO functionalize common lines - but for logger.debug message (but that could be templated and sent) work_rows = Work["libE_info"]["H_rows"] - work_name = calc_type_strings[Work["tag"]] + # work_name = calc_type_strings[Work["tag"]] # logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") if len(work_rows): new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] @@ -497,8 +498,9 @@ def _receive_from_workers(self, persis_info: dict) -> dict: def _update_state_after_local_gen(self, persis_info: dict, D_recv: dict, w: int) -> None: # TODO - note w should be zero here always - could just set to 0 or default in function and not send - self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.manager_gen_start_time) - assert len(D_recv["calc_out"]) or np.any( + if isinstance(D_recv.get("calc_out", None), np.ndarray): + self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.manager_gen_start_time) + assert D_recv['libE_info'].get('finalize') or len(D_recv["calc_out"]) or np.any( self.W["active"] ), "Gen must return work when is is the only thing active." self._freeup_resources(w) @@ -603,6 +605,22 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): data) and a kill signal is sent. """ + # SH TODO consolidate with final_gen_send below + # SH TODO should it also support final send to a non-persistent gen (if not a class) + if self.gen_on_manager: + rows_to_send = np.where(self.hist.H["sim_ended"] & ~self.hist.H["gen_informed"])[0] + print(f"Manager final gen call: {rows_to_send=}") + # TODO use something like "finalize" as below - or use a diff tag - EVAL_FINAL_GEN_TAG + work = { + "H_fields": self.gen_specs["in"], + "persis_info": persis_info[0], + "tag": EVAL_GEN_TAG, + "libE_info": {"H_rows": rows_to_send, "finalize": True}, + } + self._check_work_order(work, 0, force=True) + self._send_work_order(work, 0, persis_info) + self.hist.update_history_to_gen(rows_to_send) + # Send a handshake signal to each persistent worker. if any(self.W["persis_state"]): for w in self.W["worker_id"][self.W["persis_state"] > 0]: @@ -714,7 +732,6 @@ def run(self, persis_info: dict) -> (dict, int, int): Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info) if flag: break - for w in Work: if self._sim_max_given(): break diff --git a/libensemble/message_numbers.py b/libensemble/message_numbers.py index adfcbc2448..6caef0a6eb 100644 --- a/libensemble/message_numbers.py +++ b/libensemble/message_numbers.py @@ -41,6 +41,8 @@ # last_calc_status_rst_tag CALC_EXCEPTION = 35 # Reserved: Automatically used if user_f raised an exception +EVAL_FINAL_GEN_TAG = 36 + MAN_KILL_SIGNALS = [MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL] calc_status_strings = { diff --git a/libensemble/run_calculation.py b/libensemble/run_calculation.py index 07e89a3ab5..f8ca7abfec 100644 --- a/libensemble/run_calculation.py +++ b/libensemble/run_calculation.py @@ -24,6 +24,7 @@ CALC_EXCEPTION, EVAL_GEN_TAG, EVAL_SIM_TAG, + EVAL_FINAL_GEN_TAG, MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL, PERSIS_STOP, @@ -198,6 +199,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, calc_id = str(Work["libE_info"]["gen_count"]) else: calc_id = str(self.calc_iter[calc_type]) + # Add a right adjust (minimum width). calc_id = calc_id.rjust(5, " ") @@ -205,7 +207,13 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, try: logger.debug(f"Starting {enum_desc}: {calc_id}") - calc = self._run_calc[calc_type] + + # SH TODO May use a different calc_type + if Work["libE_info"].get("finalize", False): + calc = self._run_calc[EVAL_FINAL_GEN_TAG] + else: + calc = self._run_calc[calc_type] + with timer: # check works when making gen dirs (and when there is a workflow dir). Matrix of checks if self.EnsembleDirectory.use_calc_dirs(calc_type): loc_stack, calc_dir = self.EnsembleDirectory.prep_calc_dir( diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 498e7265f2..543860f9df 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -5,7 +5,7 @@ import numpy.typing as npt -from libensemble.message_numbers import EVAL_GEN_TAG, EVAL_SIM_TAG +from libensemble.message_numbers import EVAL_GEN_TAG, EVAL_SIM_TAG, EVAL_FINAL_GEN_TAG logger = logging.getLogger(__name__) @@ -21,10 +21,12 @@ def __init__(self, sim_specs: dict, gen_specs: dict) -> None: self.gen_specs = gen_specs self.sim_f = sim_specs["sim_f"] self.gen_f = gen_specs.get("gen_f") - + self.final_f = None if inspect.isclass(self.gen_f): self.gen_obj = self.gen_f(gen_specs) self.gen_f = self.gen_obj.run + if hasattr(self.gen_obj, "finalize") and callable(self.gen_obj.finalize): + self.final_f = self.gen_obj.finalize self.has_globus_compute_sim = len(sim_specs.get("globus_compute_endpoint", "")) > 0 self.has_globus_compute_gen = len(gen_specs.get("globus_compute_endpoint", "")) > 0 @@ -46,6 +48,8 @@ def make_runners(self) -> Dict[int, Callable]: """Creates functions to run a sim or gen. These functions are either called directly by the worker or submitted to a Globus Compute endpoint.""" + run_finalize = [] + def run_sim(calc_in, Work): """Determines how to run sim.""" if self.has_globus_compute_sim: @@ -66,10 +70,18 @@ def run_gen(calc_in, Work): return result(calc_in, Work["persis_info"], self.gen_specs, Work["libE_info"], self.gen_f, Work["tag"]) + if self.final_f is not None: + + def run_finalize(calc_in, Work): + result = self._normal_result + return result( + calc_in, Work["persis_info"], self.gen_specs, Work["libE_info"], self.final_f, Work["tag"] + ) + else: run_gen = [] - return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen} + return {EVAL_SIM_TAG: run_sim, EVAL_GEN_TAG: run_gen, EVAL_FINAL_GEN_TAG: run_finalize} def shutdown(self) -> None: if self.has_globus_compute_sim: