Skip to content

Commit

Permalink
Merge branch '2.4' into fix_sub_worker_process_shutdown_24
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh authored Apr 1, 2024
2 parents b249341 + 2320336 commit a748d75
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 43 deletions.
2 changes: 1 addition & 1 deletion nvflare/apis/utils/reliable_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def shutdown(cls):
"""
if not cls._shutdown_asked:
cls._shutdown_asked = True
cls._executor.shutdown(cancel_futures=True, wait=False)
cls._executor.shutdown(wait=False)
cls._logger.info("ReliableMessage is shutdown")

@classmethod
Expand Down
33 changes: 17 additions & 16 deletions nvflare/app_common/executors/client_api_launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def __init__(
launch_timeout: Optional[float] = None,
task_wait_timeout: Optional[float] = None,
last_result_transfer_timeout: float = 300.0,
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
external_pre_init_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 0.01,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
heartbeat_timeout: float = 60.0,
workers: int = 4,
train_with_evaluation: bool = True,
train_task_name: str = "train",
Expand All @@ -50,22 +50,23 @@ def __init__(
"""Initializes the ClientAPILauncherExecutor.
Args:
pipe_id (Optional[str]): Identifier for obtaining the Pipe from NVFlare components.
pipe_id (str): Identifier for obtaining the Pipe from NVFlare components.
launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout).
task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process.
This value should be greater than the time needed for sending the whole result.
peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout).
monitor_interval (float): Interval for monitoring the launcher (default: 0.01).
read_interval (float): Interval for reading from the pipe (default: 0.5).
heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0).
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0).
workers (int): Number of worker threads needed (default: 4).
train_with_evaluation (bool): Whether to run training with global model evaluation (default: True).
train_task_name (str): Task name of train mode (default: train).
evaluate_task_name (str): Task name of evaluate mode (default: evaluate).
submit_model_task_name (str): Task name of submit_model mode (default: submit_model).
external_pre_init_timeout (float): Time to wait for external process before it calls flare.init().
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
monitor_interval (float): Interval for monitoring the launcher.
read_interval (float): Interval for reading from the pipe.
heartbeat_interval (float): Interval for sending heartbeat to the peer.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer.
workers (int): Number of worker threads needed.
train_with_evaluation (bool): Whether to run training with global model evaluation.
train_task_name (str): Task name of train mode.
evaluate_task_name (str): Task name of evaluate mode.
submit_model_task_name (str): Task name of submit_model mode.
from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
This ParamsConverter will be called when model is sent from nvflare controller side to executor side.
to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
Expand All @@ -82,7 +83,7 @@ def __init__(
launch_timeout=launch_timeout,
task_wait_timeout=task_wait_timeout,
last_result_transfer_timeout=last_result_transfer_timeout,
external_execution_wait=external_execution_wait,
external_pre_init_timeout=external_pre_init_timeout,
peer_read_timeout=peer_read_timeout,
monitor_interval=monitor_interval,
read_interval=read_interval,
Expand Down
60 changes: 34 additions & 26 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def __init__(
launch_timeout: Optional[float] = None,
task_wait_timeout: Optional[float] = None,
last_result_transfer_timeout: float = 300.0,
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
external_pre_init_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 1.0,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
workers: int = 1,
heartbeat_timeout: float = 60.0,
workers: int = 4,
train_with_evaluation: bool = True,
train_task_name: str = "train",
evaluate_task_name: str = "evaluate",
Expand All @@ -63,18 +63,19 @@ def __init__(
launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout).
task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process.
This value should be greater than the time needed for sending the whole result.
peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout).
monitor_interval (float): Interval for monitoring the launcher (default: 0.01).
read_interval (float): Interval for reading from the pipe (default: 0.5).
heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0).
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0).
workers (int): Number of worker threads needed (default: 1).
train_with_evaluation (bool): Whether to run training with global model evaluation (default: True).
train_task_name (str): Task name of train mode (default: train).
evaluate_task_name (str): Task name of evaluate mode (default: evaluate).
submit_model_task_name (str): Task name of submit_model mode (default: submit_model).
external_pre_init_timeout (float): Time to wait for external process before it calls flare.init().
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
monitor_interval (float): Interval for monitoring the launcher.
read_interval (float): Interval for reading from the pipe.
heartbeat_interval (float): Interval for sending heartbeat to the peer.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer.
workers (int): Number of worker threads needed.
train_with_evaluation (bool): Whether to run training with global model evaluation.
train_task_name (str): Task name of train mode.
evaluate_task_name (str): Task name of evaluate mode.
submit_model_task_name (str): Task name of submit_model mode.
from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
This ParamsConverter will be called when model is sent from nvflare controller side to executor side.
to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
Expand All @@ -96,7 +97,7 @@ def __init__(
self._launcher_finish = False
self._launcher_finish_time = None
self._last_result_transfer_timeout = last_result_transfer_timeout
self._external_execution_wait = external_execution_wait
self._external_pre_init_timeout = external_pre_init_timeout
self._received_result = Event()
self._job_end = False

Expand Down Expand Up @@ -249,7 +250,6 @@ def _initialize_external_execution(
self.log_error(fl_ctx, "External execution set up failed.")
abort_signal.trigger("External execution set up failed.")
return False
time.sleep(self._external_execution_wait)
return True

def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs) -> Any:
Expand Down Expand Up @@ -277,36 +277,43 @@ def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs
def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: Signal):
start_time = time.time()
while True:
if self._launch_timeout and time.time() - start_time >= self._launch_timeout:
self.log_error(fl_ctx, f"External execution is not set up within timeout: {self._launch_timeout}")
if self._external_pre_init_timeout and time.time() - start_time >= self._external_pre_init_timeout:
self.log_error(
fl_ctx,
f"External process has not called flare.init within timeout: {self._external_pre_init_timeout}",
)
return False

if abort_signal.triggered:
self.log_info(fl_ctx, "External execution has not called flare.init but abort signal is triggered.")
return False

if self.peer_is_up_or_dead():
return True

if self.launcher.check_run_status(task_name, fl_ctx) != LauncherRunStatus.RUNNING:
run_status = self.launcher.check_run_status(task_name, fl_ctx)
if run_status != LauncherRunStatus.RUNNING:
self.log_info(
fl_ctx, f"External process has not called flare.init and run status becomes {run_status}."
)
return False

time.sleep(0.1)

def _finalize_external_execution(
self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal
) -> bool:
with self._lock:
if self._job_end:
ask_peer_end_success = self.ask_peer_to_end(fl_ctx)
if not ask_peer_end_success:
return False
if self._job_end:
ask_peer_end_success = self.ask_peer_to_end(fl_ctx)
if not ask_peer_end_success:
return False

check_run_status = self._execute_launcher_method_in_thread_executor(
method_name="check_run_status",
task_name=task_name,
fl_ctx=fl_ctx,
)
if check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}")

self.log_info(fl_ctx, f"Calling stop task ({task_name}).")
Expand Down Expand Up @@ -367,6 +374,7 @@ def _monitor_launcher(self, fl_ctx: FLContext):
break

if self._current_task is None:
self.pause_pipe_handler()
continue

task_name = self._current_task
Expand Down

0 comments on commit a748d75

Please sign in to comment.