Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance/improve perform #1063

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions libensemble/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __init__(
self.last_started = -1
self.last_ended = -1

def update_history_f(self, D: dict, safe_mode: bool) -> None:
def update_history_f(self, D: dict, safe_mode: bool, kill_canceled_sims: bool = False) -> None:
"""
Updates the history after points have been evaluated
"""
Expand Down Expand Up @@ -154,13 +154,14 @@ def update_history_f(self, D: dict, safe_mode: bool) -> None:
self.H["sim_ended_time"][ind] = time.time()
self.sim_ended_count += 1

for j in range(self.last_ended + 1, np.max(new_inds) + 1):
if self.H["sim_ended"][j]:
self.last_ended += 1
else:
break
if kill_canceled_sims:
for j in range(self.last_ended + 1, np.max(new_inds) + 1):
if self.H["sim_ended"][j]:
self.last_ended += 1
else:
break

def update_history_x_out(self, q_inds: npt.NDArray, sim_worker: int) -> None:
def update_history_x_out(self, q_inds: npt.NDArray, sim_worker: int, kill_canceled_sims: bool = False) -> None:
"""
Updates the history (in place) when new points have been given out to be evaluated

Expand All @@ -180,7 +181,8 @@ def update_history_x_out(self, q_inds: npt.NDArray, sim_worker: int) -> None:
self.H["sim_worker"][q_inds] = sim_worker

self.sim_started_count += len(q_inds)
self.last_started = np.max(q_inds)
if kill_canceled_sims:
self.last_started = np.max(q_inds)

def update_history_to_gen(self, q_inds: npt.NDArray):
"""Updates the history (in place) when points are given back to the gen"""
Expand Down
9 changes: 6 additions & 3 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def _update_state_on_alloc(self, Work: dict, w: int):

work_rows = Work["libE_info"]["H_rows"]
if Work["tag"] == EVAL_SIM_TAG:
self.hist.update_history_x_out(work_rows, w)
self.hist.update_history_x_out(work_rows, w, self.kill_canceled_sims)
elif Work["tag"] == EVAL_GEN_TAG:
self.hist.update_history_to_gen(work_rows)

Expand All @@ -403,6 +403,7 @@ def _receive_from_workers(self, persis_info: dict) -> dict:
communticate. If any output is received, all other workers are
looped back over.
"""
time.sleep(0.0001) # Critical for multiprocessing performance
new_stuff = True
while new_stuff:
new_stuff = False
Expand Down Expand Up @@ -433,7 +434,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False):
self.hist.update_history_x_in(w, final_data, self.safe_mode, self.W[w - 1]["gen_started_time"])
elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False):
self.hist.update_history_f(D_recv, self.safe_mode)
self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims)
else:
logger.info(_PERSIS_RETURN_WARNING)
self.W[w - 1]["persis_state"] = 0
Expand All @@ -446,7 +447,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
self._freeup_resources(w)
else:
if calc_type == EVAL_SIM_TAG:
self.hist.update_history_f(D_recv, self.safe_mode)
self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims)
if calc_type == EVAL_GEN_TAG:
self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.W[w - 1]["gen_started_time"])
assert (
Expand Down Expand Up @@ -485,8 +486,10 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None:

def _kill_cancelled_sims(self) -> None:
"""Send kill signals to any sims marked as cancel_requested"""

if self.kill_canceled_sims:
inds_to_check = np.arange(self.hist.last_ended + 1, self.hist.last_started + 1)

kill_sim = (
self.hist.H["sim_started"][inds_to_check]
& self.hist.H["cancel_requested"][inds_to_check]
Expand Down
Loading