Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support in-place gen on manager #1142

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion libensemble/alloc_funcs/give_sim_work_first.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,17 @@ def give_sim_work_first(

# Give gen work
return_rows = range(len(H)) if gen_in else []
# SH TODO: May want option to return all points or not, but for now using gen_on_manger to decide.
if libE_info.get("gen_on_manager", False):
# Return only new points to generator
returned_but_not_given = np.logical_and.reduce((H["sim_ended"], ~H["gen_informed"])) if gen_in else []
if np.any(returned_but_not_given):
return_rows = np.where(returned_but_not_given)[0]
try:
Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid))
if libE_info.get("gen_on_manager", False):
Work[0] = support.gen_work(0, gen_in, return_rows, persis_info.get(0))
else:
Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid))
except InsufficientFreeResources:
break
gen_count += 1
Expand Down
2 changes: 1 addition & 1 deletion libensemble/executors/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def set_workerID(self, workerid) -> None:
"""Sets the worker ID for this executor"""
self.workerID = workerid

def set_worker_info(self, comm, workerid=None) -> None:
def set_worker_info(self, comm=None, workerid=None) -> None:
"""Sets info for this executor"""
self.workerID = workerid
self.comm = comm
Expand Down
143 changes: 122 additions & 21 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glob
import logging
import os
from pathlib import Path
import platform
import socket
import sys
Expand All @@ -32,13 +33,16 @@
calc_type_strings,
)
from libensemble.resources.resources import Resources
from libensemble.run_calculation import RunCalc
from libensemble.tools.fields_keys import protected_libE_fields
from libensemble.tools.tools import _PERSIS_RETURN_WARNING, _USER_CALC_DIR_WARNING
from libensemble.utils.loc_stack import LocationStack
from libensemble.utils.misc import extract_H_ranges
from libensemble.utils.output_directory import EnsembleDirectory
from libensemble.utils.timer import Timer
from libensemble.worker import WorkerErrMsg


logger = logging.getLogger(__name__)
# For debug messages - uncomment
# logger.setLevel(logging.DEBUG)
Expand All @@ -65,6 +69,15 @@ def report_worker_exc(wrk_exc: Exception = None) -> None:
logger.error(exc)


# TODO - do we need persis_in here also
def get_dtypes(H, sim_specs, gen_specs):
dtypes = {
EVAL_SIM_TAG: repack_fields(H[sim_specs["in"]]).dtype,
EVAL_GEN_TAG: repack_fields(H[gen_specs["in"]]).dtype,
}
return dtypes


def manager_main(
hist: npt.NDArray,
libE_specs: dict,
Expand Down Expand Up @@ -112,10 +125,7 @@ def manager_main(
gen_specs["in"] = []

# Send dtypes to workers
dtypes = {
EVAL_SIM_TAG: repack_fields(hist.H[sim_specs["in"]]).dtype,
EVAL_GEN_TAG: repack_fields(hist.H[gen_specs["in"]]).dtype,
}
dtypes = get_dtypes(hist.H, sim_specs, gen_specs)

for wcomm in wcomms:
wcomm.send(0, dtypes)
Expand Down Expand Up @@ -198,6 +208,15 @@ def __init__(
self.use_resource_sets = dyn_keys_in_H or self.libE_specs.get("num_resource_sets")
self.gen_num_procs = libE_specs.get("gen_num_procs", 0)
self.gen_num_gpus = libE_specs.get("gen_num_gpus", 0)
self.gen_on_manager = libE_specs.get("gen_on_manager", False)

if self.gen_on_manager:
self.manager_gen_start_time = 0
dtypes = get_dtypes(self.hist.H, sim_specs, gen_specs)
# TODO rename from gen_worker
self.gen_worker = RunCalc(dtypes, 0, sim_specs, gen_specs, libE_specs)
self.LS = LocationStack()
self.LS.register_loc("workflow", Path(libE_specs.get("workflow_dir_path")))

self.W = np.zeros(len(self.wcomms), dtype=Manager.worker_dtype)
self.W["worker_id"] = np.arange(len(self.wcomms)) + 1
Expand Down Expand Up @@ -301,17 +320,23 @@ def _save_every_k_gens(self) -> None:

def _check_work_order(self, Work: dict, w: int, force: bool = False) -> None:
"""Checks validity of an allocation function order"""
assert w != 0, "Can't send to worker 0; this is the manager."
if self.W[w - 1]["active_recv"]:
assert "active_recv" in Work["libE_info"], (
"Messages to a worker in active_recv mode should have active_recv"
f"set to True in libE_info. Work['libE_info'] is {Work['libE_info']}"
)
else:
if not force:
assert self.W[w - 1]["active"] == 0, (
"Allocation function requested work be sent to worker %d, an already active worker." % w

if not self.gen_on_manager:
assert w != 0, "Can't send to worker 0; this is the manager."

# gen on manager (run in place) is not persistent or active_recv and cannot be active already
if w != 0:
if self.W[w - 1]["active_recv"]:
assert "active_recv" in Work["libE_info"], (
"Messages to a worker in active_recv mode should have active_recv"
f"set to True in libE_info. Work['libE_info'] is {Work['libE_info']}"
)
else:
if not force:
assert self.W[w - 1]["active"] == 0, (
"Allocation function requested work be sent to worker %d, an already active worker." % w
)

work_rows = Work["libE_info"]["H_rows"]
if len(work_rows):
work_fields = set(Work["H_fields"])
Expand Down Expand Up @@ -347,7 +372,50 @@ def _freeup_resources(self, w: int) -> None:
if self.resources:
self.resources.resource_manager.free_rsets(w)

def _send_work_order(self, Work: dict, w: int) -> None:
def _run_calc_in_place(self, Work, w, persis_info, H_to_be_sent=None):
"""Run user function in place"""

with self.LS.loc("workflow"):
D_recv = self.gen_worker.run(Work, H_to_be_sent)

# TODO - check: skipping _handle_msg_from_worker
# TODO - persis_info needs to be passed through (via _send_work_order) - alt to make self.persis_info
# or use persis_info for the worker which is in Work see: "persis_info": persis_info[w],
# but we may want to be able to change persis_info direct keys when run on manager
self._update_state_after_local_gen(persis_info, D_recv, w)

def _send_work_order(self, Work: dict, w: int, persis_info: dict) -> None:
if self.gen_on_manager and Work["tag"] == EVAL_GEN_TAG:
self._set_manager_work_order(Work, w, persis_info)
else:
self._send_work_to_worker(Work, w)

# SH TODO look at naming of functions
# SH TODO on final send - has all persis_info here - not just persis_info[0] - check
def _set_manager_work_order(self, Work: dict, w: int, persis_info: dict) -> None:
logger.debug("Manager running generator")

if self.resources:
self._set_resources(Work, w)

self.manager_gen_start_time = time.time()

# TODO functionalize common lines - but for logger.debug message (but that could be templated and sent)
work_rows = Work["libE_info"]["H_rows"]
# work_name = calc_type_strings[Work["tag"]]
# logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}")
if len(work_rows):
new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]]
H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype)
for i, row in enumerate(work_rows):
# TODO: if gen on manager may not need repack_fields
H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row])
else:
H_to_be_sent = None
self.hist.update_history_to_gen(work_rows)
self._run_calc_in_place(Work, w, persis_info, H_to_be_sent)

def _send_work_to_worker(self, Work: dict, w: int) -> None:
"""Sends an allocation function order to a worker"""
logger.debug(f"Manager sending work unit to worker {w}")

Expand All @@ -371,6 +439,11 @@ def _send_work_order(self, Work: dict, w: int) -> None:

def _update_state_on_alloc(self, Work: dict, w: int):
"""Updates a workers' active/idle status following an allocation order"""

# if this is self.gen_on_manager
if w == 0:
return

self.W[w - 1]["active"] = Work["tag"]
if "libE_info" in Work:
if "persistent" in Work["libE_info"]:
Expand Down Expand Up @@ -423,6 +496,15 @@ def _receive_from_workers(self, persis_info: dict) -> dict:
self._save_every_k_gens()
return persis_info

def _update_state_after_local_gen(self, persis_info: dict, D_recv: dict, w: int) -> None:
# TODO - note w should be zero here always - could just set to 0 or default in function and not send
if isinstance(D_recv.get("calc_out", None), np.ndarray):
self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.manager_gen_start_time)
assert D_recv['libE_info'].get('finalize') or len(D_recv["calc_out"]) or np.any(
self.W["active"]
), "Gen must return work when is is the only thing active."
self._freeup_resources(w)

def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -> None:
"""Updates history and worker info on worker message"""
calc_type = D_recv["calc_type"]
Expand Down Expand Up @@ -523,6 +605,22 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int):
data) and a kill signal is sent.
"""

# SH TODO consolidate with final_gen_send below
# SH TODO should it also support final send to a non-persistent gen (if not a class)
if self.gen_on_manager:
rows_to_send = np.where(self.hist.H["sim_ended"] & ~self.hist.H["gen_informed"])[0]
print(f"Manager final gen call: {rows_to_send=}")
# TODO use something like "finalize" as below - or use a diff tag - EVAL_FINAL_GEN_TAG
work = {
"H_fields": self.gen_specs["in"],
"persis_info": persis_info[0],
"tag": EVAL_GEN_TAG,
"libE_info": {"H_rows": rows_to_send, "finalize": True},
}
self._check_work_order(work, 0, force=True)
self._send_work_order(work, 0, persis_info)
self.hist.update_history_to_gen(rows_to_send)

# Send a handshake signal to each persistent worker.
if any(self.W["persis_state"]):
for w in self.W["worker_id"][self.W["persis_state"] > 0]:
Expand All @@ -536,7 +634,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int):
"libE_info": {"persistent": True, "H_rows": rows_to_send},
}
self._check_work_order(work, w, force=True)
self._send_work_order(work, w)
self._send_work_order(work, w, persis_info)
self.hist.update_history_to_gen(rows_to_send)
else:
self.wcomms[w - 1].send(PERSIS_STOP, MAN_SIGNAL_KILL)
Expand Down Expand Up @@ -584,6 +682,7 @@ def _get_alloc_libE_info(self) -> dict:
"use_resource_sets": self.use_resource_sets,
"gen_num_procs": self.gen_num_procs,
"gen_num_gpus": self.gen_num_gpus,
"gen_on_manager": self.gen_on_manager,
}

def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict:
Expand Down Expand Up @@ -633,16 +732,18 @@ def run(self, persis_info: dict) -> (dict, int, int):
Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info)
if flag:
break

for w in Work:
if self._sim_max_given():
break
self._check_work_order(Work[w], w)
self._send_work_order(Work[w], w)
self._send_work_order(Work[w], w, persis_info)
self._update_state_on_alloc(Work[w], w)
assert self.term_test() or any(
self.W["active"] != 0
), "alloc_f did not return any work, although all workers are idle."

# gen_on_manager completes already so can have all idle at this point
if 0 not in Work:
assert self.term_test() or any(
self.W["active"] != 0
), "alloc_f did not return any work, although all workers are idle."
except WorkerException as e:
report_worker_exc(e)
raise LoggedException(e.args[0], e.args[1]) from None
Expand Down
2 changes: 2 additions & 0 deletions libensemble/message_numbers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
# last_calc_status_rst_tag
CALC_EXCEPTION = 35 # Reserved: Automatically used if user_f raised an exception

EVAL_FINAL_GEN_TAG = 36

MAN_KILL_SIGNALS = [MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL]

calc_status_strings = {
Expand Down
2 changes: 1 addition & 1 deletion libensemble/resources/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def get_avail_rsets_by_group(self):
for g in groups:
self.avail_rsets_by_group[g] = []
for ind, rset in enumerate(rsets):
if not rset["assigned"]:
if rset["assigned"] == -1: # now default is -1.
g = rset["group"]
self.avail_rsets_by_group[g].append(ind)
return self.avail_rsets_by_group
Expand Down
9 changes: 4 additions & 5 deletions libensemble/resources/worker_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ def __init__(self, num_workers: int, resources: "GlobalResources") -> None: # n
)

self.rsets = np.zeros(self.total_num_rsets, dtype=ResourceManager.man_rset_dtype)
self.rsets["assigned"] = 0
self.rsets["assigned"] = -1 # Can assign to manager (=0) so make unset value -1
for field in self.all_rsets.dtype.names:
self.rsets[field] = self.all_rsets[field]
self.num_groups = self.rsets["group"][-1]

self.rsets_free = self.total_num_rsets
self.gpu_rsets_free = self.total_num_gpu_rsets
self.nongpu_rsets_free = self.total_num_nongpu_rsets
Expand All @@ -70,7 +69,7 @@ def assign_rsets(self, rset_team, worker_id):
if rset_team:
rteam = self.rsets["assigned"][rset_team]
for i, wid in enumerate(rteam):
if wid == 0:
if wid == -1:
self.rsets["assigned"][rset_team[i]] = worker_id
self.rsets_free -= 1
if self.rsets["gpus"][rset_team[i]]:
Expand All @@ -85,13 +84,13 @@ def assign_rsets(self, rset_team, worker_id):
def free_rsets(self, worker=None):
"""Free up assigned resource sets"""
if worker is None:
self.rsets["assigned"] = 0
self.rsets["assigned"] = -1
self.rsets_free = self.total_num_rsets
self.gpu_rsets_free = self.total_num_gpu_rsets
self.nongpu_rsets_free = self.total_num_nongpu_rsets
else:
rsets_to_free = np.where(self.rsets["assigned"] == worker)[0]
self.rsets["assigned"][rsets_to_free] = 0
self.rsets["assigned"][rsets_to_free] = -1
self.rsets_free += len(rsets_to_free)
self.gpu_rsets_free += np.count_nonzero(self.rsets["gpus"][rsets_to_free])
self.nongpu_rsets_free += np.count_nonzero(~self.rsets["gpus"][rsets_to_free])
Expand Down
Loading