From 8815eb3a280c1dbd2291de0ab32195d2abf84d47 Mon Sep 17 00:00:00 2001 From: shudson Date: Fri, 18 Aug 2023 17:33:25 -0500 Subject: [PATCH 1/2] Only update sliding window when kill_canceled_sims is True --- libensemble/history.py | 18 ++++++++++-------- libensemble/manager.py | 8 +++++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index 1a6e9c755..02a532bbe 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -123,7 +123,7 @@ def __init__( self.last_started = -1 self.last_ended = -1 - def update_history_f(self, D: dict, safe_mode: bool) -> None: + def update_history_f(self, D: dict, safe_mode: bool, kill_canceled_sims: bool = False) -> None: """ Updates the history after points have been evaluated """ @@ -154,13 +154,14 @@ def update_history_f(self, D: dict, safe_mode: bool) -> None: self.H["sim_ended_time"][ind] = time.time() self.sim_ended_count += 1 - for j in range(self.last_ended + 1, np.max(new_inds) + 1): - if self.H["sim_ended"][j]: - self.last_ended += 1 - else: - break + if kill_canceled_sims: + for j in range(self.last_ended + 1, np.max(new_inds) + 1): + if self.H["sim_ended"][j]: + self.last_ended += 1 + else: + break - def update_history_x_out(self, q_inds: npt.NDArray, sim_worker: int) -> None: + def update_history_x_out(self, q_inds: npt.NDArray, sim_worker: int, kill_canceled_sims: bool = False) -> None: """ Updates the history (in place) when new points have been given out to be evaluated @@ -180,7 +181,8 @@ def update_history_x_out(self, q_inds: npt.NDArray, sim_worker: int) -> None: self.H["sim_worker"][q_inds] = sim_worker self.sim_started_count += len(q_inds) - self.last_started = np.max(q_inds) + if kill_canceled_sims: + self.last_started = np.max(q_inds) def update_history_to_gen(self, q_inds: npt.NDArray): """Updates the history (in place) when points are given back to the gen""" diff --git a/libensemble/manager.py b/libensemble/manager.py index 426b54738..ed3ff453f 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -377,7 +377,7 @@ def _update_state_on_alloc(self, Work: dict, w: int): work_rows = Work["libE_info"]["H_rows"] if Work["tag"] == EVAL_SIM_TAG: - self.hist.update_history_x_out(work_rows, w) + self.hist.update_history_x_out(work_rows, w, self.kill_canceled_sims) elif Work["tag"] == EVAL_GEN_TAG: self.hist.update_history_to_gen(work_rows) @@ -433,7 +433,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False): self.hist.update_history_x_in(w, final_data, self.safe_mode, self.W[w - 1]["gen_started_time"]) elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False): - self.hist.update_history_f(D_recv, self.safe_mode) + self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims) else: logger.info(_PERSIS_RETURN_WARNING) self.W[w - 1]["persis_state"] = 0 @@ -446,7 +446,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - self._freeup_resources(w) else: if calc_type == EVAL_SIM_TAG: - self.hist.update_history_f(D_recv, self.safe_mode) + self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims) if calc_type == EVAL_GEN_TAG: self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.W[w - 1]["gen_started_time"]) assert ( @@ -485,8 +485,10 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: def _kill_cancelled_sims(self) -> None: """Send kill signals to any sims marked as cancel_requested""" + if self.kill_canceled_sims: inds_to_check = np.arange(self.hist.last_ended + 1, self.hist.last_started + 1) + kill_sim = ( self.hist.H["sim_started"][inds_to_check] & self.hist.H["cancel_requested"][inds_to_check] From 0b678ebfbec3aa38fbe116940b9cbccc702eb820 Mon Sep 17 00:00:00 2001 From: shudson Date: Fri, 18 Aug 2023 17:34:20 -0500 Subject: [PATCH 2/2] Add sleep for multiprocessing performance --- libensemble/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libensemble/manager.py b/libensemble/manager.py index ed3ff453f..9df20935e 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -403,6 +403,7 @@ def _receive_from_workers(self, persis_info: dict) -> dict: communticate. If any output is received, all other workers are looped back over. """ + time.sleep(0.0001) # Critical for multiprocessing performance new_stuff = True while new_stuff: new_stuff = False