Skip to content

Commit

Permalink
Fixes/various fixes and optimizations (#1229)
Browse files Browse the repository at this point in the history
* presumably handle no task state being set in executor exception, by polling the task anyway. allow wait_on_start to be set as an integer N seconds to timeout

* whoops, cant poll an exceptioned app. set task's state to FAILED on exception. test wait_on_start=int

* have to use Union on older pythons :(

* remove check that received calc_type is correct. On the worker side, its received from the manager, not modified or passed anywhere, then returned as-is to the manager.

* removes the last check in check_received_calc. the actual logging/checking is performed on the worker-side in the _handle_calc finally block.

* various cleanups in worker.py

* only import cProfile if the profile option is enabled

* try adding TASK_FAILED_TO_START state

* passing through wait_on_start int value to _wait_on_start in base class

* adjust test_retries_launch_fail for new status

* add FAILED_TO_START to docs

* catch exception and break in polling_loop with warning. set task status to finished upon retry. test these things

* add instruction to run extra-CI tests as part of release process

* these conditions always only execute one branch. e.g. if we're running a gen on the worker side, we always have a gen count

* undoing changes to worker.run()
  • Loading branch information
jlnav authored Feb 5, 2024
1 parent ff1e978 commit 1ba9f57
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 63 deletions.
2 changes: 2 additions & 0 deletions docs/dev_guide/release_management/release_process.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Before release

- On-line CI (GitHub Actions) tests must pass.

- Launch and verify extra tests: ``gh workflow run libEnsemble-complete-CI --ref release/branch_name``

- Scaling tests must be run on HPC platforms listed as supported in release notes.
Test variants by platform, launch mechanism, scale, and other factors can
be configured and exported by the libE-Templater_.
Expand Down
2 changes: 1 addition & 1 deletion docs/executor/executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ See the Executor APIs for optional arguments.
below and the query functions.

:task.state: (string) The task status. One of
("UNKNOWN"|"CREATED"|"WAITING"|"RUNNING"|"FINISHED"|"USER_KILLED"|"FAILED")
("UNKNOWN"|"CREATED"|"WAITING"|"RUNNING"|"FINISHED"|"USER_KILLED"|"FAILED"|"FAILED_TO_START")
:task.process: (process obj) The process object used by the underlying process
manager (e.g., return value of subprocess.Popen).
Expand Down
26 changes: 18 additions & 8 deletions libensemble/executors/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
MAN_KILL_SIGNALS,
STOP_TAG,
TASK_FAILED,
TASK_FAILED_TO_START,
UNSET_TAG,
WORKER_DONE,
WORKER_KILL_ON_TIMEOUT,
Expand All @@ -37,7 +38,8 @@
RUNNING
FINISHED
USER_KILLED
FAILED""".split()
FAILED
FAILED_TO_START""".split()

NOT_STARTED_STATES = """
CREATED
Expand All @@ -48,6 +50,7 @@
FINISHED
USER_KILLED
FAILED
FAILED_TO_START
""".split()


Expand Down Expand Up @@ -569,8 +572,8 @@ def polling_loop(
self, task: Task, timeout: Optional[int] = None, delay: float = 0.1, poll_manager: bool = False
) -> int:
"""Optional, blocking, generic task status polling loop. Operates until the task
finishes, times out, or is Optionally killed via a manager signal. On completion, returns a
presumptive :ref:`calc_status<funcguides-calcstatus>` integer. Potentially useful
finishes, times out, or is optionally killed via a manager signal. On completion, returns a
presumptive :ref:`calc_status<funcguides-calcstatus>` integer. Useful
for running an application via the Executor until it stops without monitoring
its intermediate output.
Expand Down Expand Up @@ -600,7 +603,11 @@ def polling_loop(
calc_status = UNSET_TAG

while not task.finished:
task.poll()
try:
task.poll()
except ExecutorException as e:
logger.warning(f"Exception in polling_loop: {e}")
break

if poll_manager:
man_signal = self.manager_poll()
Expand All @@ -619,6 +626,8 @@ def polling_loop(
if calc_status == UNSET_TAG:
if task.state == "FINISHED":
calc_status = WORKER_DONE
elif task.state == "FAILED_TO_START":
calc_status = TASK_FAILED_TO_START
elif task.state == "FAILED":
calc_status = TASK_FAILED
else:
Expand Down Expand Up @@ -676,7 +685,7 @@ def submit(
stdout: Optional[str] = None,
stderr: Optional[str] = None,
dry_run: Optional[bool] = False,
wait_on_start: Optional[bool] = False,
wait_on_start: Optional[Union[bool, int]] = False,
env_script: Optional[str] = None,
) -> Task:
"""Create a new task and run as a local serial subprocess.
Expand Down Expand Up @@ -707,9 +716,10 @@ def submit(
Whether this is a dry_run - no task will be launched; instead
runline is printed to logger (at INFO level)
wait_on_start: bool, Optional
wait_on_start: bool or int, Optional
Whether to wait for task to be polled as RUNNING (or other
active/end state) before continuing
active/end state) before continuing. If an integer N is supplied,
wait at most N seconds.
env_script: str, Optional
The full path of a shell script to set up the environment for the
Expand Down Expand Up @@ -762,7 +772,7 @@ def submit(
start_new_session=False,
)
if wait_on_start:
self._wait_on_start(task, 0) # No fail time as no re-starts in-place
self._wait_on_start(task, wait_on_start)

if not task.timer.timing and not task.finished:
task.timer.start()
Expand Down
17 changes: 12 additions & 5 deletions libensemble/executors/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def set_gen_procs_gpus(self, libE_info):
def set_resources(self, resources: Resources) -> None:
self.resources = resources

def _launch_with_retries(self, task: Task, subgroup_launch: bool, wait_on_start: bool, run_cmd: List[str]) -> None:
def _launch_with_retries(
self, task: Task, subgroup_launch: bool, wait_on_start: Union[bool, int], run_cmd: List[str]
) -> None:
"""Launch task with retry mechanism"""
retry_count = 0

Expand All @@ -157,11 +159,15 @@ def _launch_with_retries(self, task: Task, subgroup_launch: bool, wait_on_start:
)
except Exception as e:
logger.warning(f"task {task.name} submit command failed on try {retry_count} with error {e}")
task.state = "FAILED_TO_START"
task.finished = True
retry = True
retry_count += 1
else:
if wait_on_start:
self._wait_on_start(task, self.fail_time)
wait_time = wait_on_start if isinstance(wait_on_start, int) else self.fail_time
self._wait_on_start(task, wait_time)
task.poll()

if task.state == "FAILED":
logger.warning(
Expand Down Expand Up @@ -193,7 +199,7 @@ def submit(
stage_inout: Optional[str] = None,
hyperthreads: Optional[bool] = False,
dry_run: Optional[bool] = False,
wait_on_start: Optional[bool] = False,
wait_on_start: Optional[Union[bool, int]] = False,
extra_args: Optional[str] = None,
auto_assign_gpus: Optional[bool] = False,
match_procs_to_gpus: Optional[bool] = False,
Expand Down Expand Up @@ -253,9 +259,10 @@ def submit(
Whether this is a dry_run - no task will be launched; instead
runline is printed to logger (at INFO level)
wait_on_start: bool, Optional
wait_on_start: bool or int, Optional
Whether to wait for task to be polled as RUNNING (or other
active/end state) before continuing
active/end state) before continuing. If an integer N is supplied,
wait at most N seconds.
extra_args: str, Optional
Additional command line arguments to supply to MPI runner. If
Expand Down
16 changes: 0 additions & 16 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
MAN_SIGNAL_KILL,
PERSIS_STOP,
STOP_TAG,
calc_status_strings,
calc_type_strings,
)
from libensemble.resources.resources import Resources
Expand Down Expand Up @@ -407,20 +406,6 @@ def _update_state_on_alloc(self, Work: dict, w: int):

# --- Handle incoming messages from workers

@staticmethod
def _check_received_calc(D_recv: dict) -> None:
"""Checks the type and status fields on a receive calculation"""
calc_type = D_recv["calc_type"]
calc_status = D_recv["calc_status"]
assert calc_type in [
EVAL_SIM_TAG,
EVAL_GEN_TAG,
], f"Aborting, Unknown calculation type received. Received type: {calc_type}"

assert calc_status in list(calc_status_strings.keys()) + [PERSIS_STOP] or isinstance(
calc_status, str
), f"Aborting: Unknown calculation status received. Received status: {calc_status}"

def _receive_from_workers(self, persis_info: dict) -> dict:
"""Receives calculation output from workers. Loops over all
active workers and probes to see if worker is ready to
Expand All @@ -443,7 +428,6 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
"""Updates history and worker info on worker message"""
calc_type = D_recv["calc_type"]
calc_status = D_recv["calc_status"]
Manager._check_received_calc(D_recv)

keep_state = D_recv["libE_info"].get("keep_state", False)
if w not in self.persis_pending and not self.W[w - 1]["active_recv"] and not keep_state:
Expand Down
8 changes: 5 additions & 3 deletions libensemble/message_numbers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
WORKER_KILL_ON_ERR = 31 # Worker killed due to an error in results
WORKER_KILL_ON_TIMEOUT = 32 # Worker killed on timeout
TASK_FAILED = 33 # Calc had tasks that failed
WORKER_DONE = 34 # Calculation was successful
TASK_FAILED_TO_START = 34 # Calc had tasks that failed to start
WORKER_DONE = 35 # Calculation was successful
# last_calc_status_rst_tag
CALC_EXCEPTION = 35 # Reserved: Automatically used if user_f raised an exception
CALC_EXCEPTION = 36 # Reserved: Automatically used if user_f raised an exception

MAN_KILL_SIGNALS = [MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL]

Expand All @@ -52,7 +53,8 @@
WORKER_KILL_ON_ERR: "Worker killed task on Error",
WORKER_KILL_ON_TIMEOUT: "Worker killed task on Timeout",
WORKER_KILL: "Worker killed",
TASK_FAILED: "Task Failed",
TASK_FAILED: "Task Failed during run",
TASK_FAILED_TO_START: "Task Failed to start",
WORKER_DONE: "Completed",
CALC_EXCEPTION: "Exception occurred",
None: "Unknown Status",
Expand Down
37 changes: 27 additions & 10 deletions libensemble/tests/unit_tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,16 @@ def test_launch_wait_on_start():
exctr = Executor.executor
cores = NCORES
args_for_sim = "sleep 0.2"
task = exctr.submit(calc_type="sim", num_procs=cores, app_args=args_for_sim, wait_on_start=True)
assert task.state not in NOT_STARTED_STATES, "Task should not be in a NOT_STARTED state. State: " + str(task.state)
exctr.poll(task)
if not task.finished:
task = polling_loop(exctr, task)
assert task.finished, "task.finished should be True. Returned " + str(task.finished)
assert task.state == "FINISHED", "task.state should be FINISHED. Returned " + str(task.state)
for value in [True, 1]:
task = exctr.submit(calc_type="sim", num_procs=cores, app_args=args_for_sim, wait_on_start=value)
assert task.state not in NOT_STARTED_STATES, "Task should not be in a NOT_STARTED state. State: " + str(
task.state
)
exctr.poll(task)
if not task.finished:
task = polling_loop(exctr, task)
assert task.finished, "task.finished should be True. Returned " + str(task.finished)
assert task.state == "FINISHED", "task.state should be FINISHED. Returned " + str(task.state)


def test_kill_on_file():
Expand Down Expand Up @@ -651,20 +654,33 @@ def test_retries_launch_fail():
cores = NCORES
args_for_sim = "sleep 0"
task = exctr.submit(calc_type="sim", num_procs=cores, app_args=args_for_sim)
assert task.state == "CREATED", "task.state should be CREATED. Returned " + str(task.state)
assert task.state == "FAILED_TO_START", "task.state should be FAILED_TO_START. Returned " + str(task.state)
assert exctr.mpi_runner_obj.subgroup_launch, "subgroup_launch should be True"
assert task.run_attempts == 5, "task.run_attempts should be 5. Returned " + str(task.run_attempts)


def test_retries_before_polling_loop_method():
print(f"\nTest: {sys._getframe().f_code.co_name}\n")
setup_executor_fakerunner()
exctr = Executor.executor
exctr.retry_delay_incr = 0.05
cores = NCORES
args_for_sim = "sleep 0"
task = exctr.submit(calc_type="sim", num_procs=cores, app_args=args_for_sim)
exctr.polling_loop(task, timeout=1)
assert task.finished, "task.finished should be True. Returned " + str(task.finished)
assert task.state == "FAILED_TO_START", "task.state should be FAILED_TO_START. Returned " + str(task.state)
assert task.run_attempts == 5, "task.run_attempts should be 5. Returned " + str(task.run_attempts)


def test_retries_run_fail():
print(f"\nTest: {sys._getframe().f_code.co_name}\n")
setup_executor()
exctr = Executor.executor
exctr.retry_delay_incr = 0.05
exctr.fail_time = 3
cores = NCORES
args_for_sim = "sleep 0 Fail"
task = exctr.submit(calc_type="sim", num_procs=cores, app_args=args_for_sim, wait_on_start=True)
task = exctr.submit(calc_type="sim", num_procs=cores, app_args=args_for_sim, wait_on_start=3)
assert task.state == "FAILED", "task.state should be FAILED. Returned " + str(task.state)
assert task.run_attempts == 5, "task.run_attempts should be 5. Returned " + str(task.run_attempts)

Expand Down Expand Up @@ -846,6 +862,7 @@ def test_non_existent_app_mpi():
test_poll_task_with_no_submit()
test_task_failure()
test_retries_launch_fail()
test_retries_before_polling_loop_method()
test_retries_run_fail()
test_register_apps()
test_serial_exes()
Expand Down
40 changes: 20 additions & 20 deletions libensemble/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
====================================================
"""

import cProfile
import logging
import logging.handlers
import socket
Expand Down Expand Up @@ -80,6 +79,8 @@ def worker_main(
"""

if libE_specs.get("profile"):
import cProfile

pr = cProfile.Profile()
pr.enable()

Expand Down Expand Up @@ -219,6 +220,18 @@ def _set_resources(workerID, comm: "communicator") -> bool: # noqa: F821
logger.debug(f"No resources set on worker {workerID}")
return False

def _extract_debug_data(self, calc_type, Work):
if calc_type == EVAL_SIM_TAG:
enum_desc = "sim_id"
calc_id = extract_H_ranges(Work)
else:
enum_desc = "Gen no"
# Use global gen count if available
calc_id = str(Work["libE_info"].get("gen_count")) # if we're doing a gen, we always have a gen count?
# Add a right adjust (minimum width).
calc_id = calc_id.rjust(5, " ")
return enum_desc, calc_id

def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, int):
"""Runs a calculation on this worker object.
Expand All @@ -238,21 +251,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict,
calc_type = Work["tag"]
self.calc_iter[calc_type] += 1

# calc_stats stores timing and summary info for this Calc (sim or gen)
# calc_id = next(self._calc_id_counter)

if calc_type == EVAL_SIM_TAG:
enum_desc = "sim_id"
calc_id = extract_H_ranges(Work)
else:
enum_desc = "Gen no"
# Use global gen count if available
if Work["libE_info"].get("gen_count"):
calc_id = str(Work["libE_info"]["gen_count"])
else:
calc_id = str(self.calc_iter[calc_type])
# Add a right adjust (minimum width).
calc_id = calc_id.rjust(5, " ")
enum_desc, calc_id = self._extract_debug_data(calc_type, Work)

timer = Timer()

Expand Down Expand Up @@ -281,12 +280,12 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict,
if tag in [STOP_TAG, PERSIS_STOP] and message is MAN_SIGNAL_FINISH:
calc_status = MAN_SIGNAL_FINISH

if out: # better way of doing this logic?
if out:
if len(out) >= 3: # Out, persis_info, calc_status
calc_status = out[2]
return out
elif len(out) == 2: # Out, persis_info OR Out, calc_status
if isinstance(out[1], int) or isinstance(out[1], str): # got Out, calc_status
if isinstance(out[1], (int, str)): # got Out, calc_status
calc_status = out[1]
return out[0], Work["persis_info"], calc_status
return *out, calc_status # got Out, persis_info
Expand All @@ -301,6 +300,8 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict,
raise
finally:
ctype_str = calc_type_strings[calc_type]
# effectively converts calc_status to the relevant string or returns as-is.
# on the manager side, the only ones used for functionality are the FINISHED_PERSISTENT tags
status = calc_status_strings.get(calc_status, calc_status)
calc_msg = self._get_calc_msg(enum_desc, calc_id, ctype_str, timer, status)

Expand All @@ -314,7 +315,6 @@ def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Tim
calc_msg += Executor.executor.new_tasks_timing(datetime=self.stats_fmt.get("task_datetime", False))

if self.stats_fmt.get("show_resource_sets", False):
# Maybe just call option resource_sets if already in sub-dictionary
resources = Resources.resources.worker_resources
calc_msg += f" rsets: {resources.rset_team}"

Expand Down Expand Up @@ -401,7 +401,7 @@ def run(self) -> None:
continue
else:
logger.debug(f"mtag: {mtag}; Work: {Work}")
raise
raise ValueError("Received unexpected Work message: ", Work)

response = self._handle(Work)
if response is None:
Expand Down

0 comments on commit 1ba9f57

Please sign in to comment.