diff --git a/nvflare/app_common/executors/client_api_launcher_executor.py b/nvflare/app_common/executors/client_api_launcher_executor.py index 776c5a0206..f34b2c6f87 100644 --- a/nvflare/app_common/executors/client_api_launcher_executor.py +++ b/nvflare/app_common/executors/client_api_launcher_executor.py @@ -31,11 +31,11 @@ def __init__( task_wait_timeout: Optional[float] = None, last_result_transfer_timeout: float = 300.0, external_execution_wait: float = 5.0, - peer_read_timeout: Optional[float] = None, + peer_read_timeout: Optional[float] = 60.0, monitor_interval: float = 0.01, read_interval: float = 0.5, heartbeat_interval: float = 5.0, - heartbeat_timeout: float = 30.0, + heartbeat_timeout: float = 60.0, workers: int = 4, train_with_evaluation: bool = True, train_task_name: str = "train", @@ -50,22 +50,23 @@ def __init__( """Initializes the ClientAPILauncherExecutor. Args: - pipe_id (Optional[str]): Identifier for obtaining the Pipe from NVFlare components. + pipe_id (str): Identifier for obtaining the Pipe from NVFlare components. launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components. launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout). task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout). - last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0). + last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process. This value should be greater than the time needed for sending the whole result. - peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout). - monitor_interval (float): Interval for monitoring the launcher (default: 0.01). - read_interval (float): Interval for reading from the pipe (default: 0.5). - heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0). - heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0). - workers (int): Number of worker threads needed (default: 4). - train_with_evaluation (bool): Whether to run training with global model evaluation (default: True). - train_task_name (str): Task name of train mode (default: train). - evaluate_task_name (str): Task name of evaluate mode (default: evaluate). - submit_model_task_name (str): Task name of submit_model mode (default: submit_model). + external_execution_wait (float): Time to wait for external process to start. + peer_read_timeout (float, optional): time to wait for peer to accept sent message. + monitor_interval (float): Interval for monitoring the launcher. + read_interval (float): Interval for reading from the pipe. + heartbeat_interval (float): Interval for sending heartbeat to the peer. + heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer. + workers (int): Number of worker threads needed. + train_with_evaluation (bool): Whether to run training with global model evaluation. + train_task_name (str): Task name of train mode. + evaluate_task_name (str): Task name of evaluate mode. + submit_model_task_name (str): Task name of submit_model mode. from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. This ParamsConverter will be called when model is sent from nvflare controller side to executor side. to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. diff --git a/nvflare/app_common/executors/launcher_executor.py b/nvflare/app_common/executors/launcher_executor.py index 90cc6a3d0a..d6b1f08e45 100644 --- a/nvflare/app_common/executors/launcher_executor.py +++ b/nvflare/app_common/executors/launcher_executor.py @@ -43,12 +43,12 @@ def __init__( task_wait_timeout: Optional[float] = None, last_result_transfer_timeout: float = 300.0, external_execution_wait: float = 5.0, - peer_read_timeout: Optional[float] = None, + peer_read_timeout: Optional[float] = 60.0, monitor_interval: float = 1.0, read_interval: float = 0.5, heartbeat_interval: float = 5.0, - heartbeat_timeout: float = 30.0, - workers: int = 1, + heartbeat_timeout: float = 60.0, + workers: int = 4, train_with_evaluation: bool = True, train_task_name: str = "train", evaluate_task_name: str = "evaluate", @@ -63,18 +63,19 @@ def __init__( launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components. launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout). task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout). - last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0). + last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process. This value should be greater than the time needed for sending the whole result. - peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout). - monitor_interval (float): Interval for monitoring the launcher (default: 0.01). - read_interval (float): Interval for reading from the pipe (default: 0.5). - heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0). - heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0). - workers (int): Number of worker threads needed (default: 1). - train_with_evaluation (bool): Whether to run training with global model evaluation (default: True). - train_task_name (str): Task name of train mode (default: train). - evaluate_task_name (str): Task name of evaluate mode (default: evaluate). - submit_model_task_name (str): Task name of submit_model mode (default: submit_model). + external_execution_wait (float): Time to wait for external process to start. + peer_read_timeout (float, optional): time to wait for peer to accept sent message. + monitor_interval (float): Interval for monitoring the launcher. + read_interval (float): Interval for reading from the pipe. + heartbeat_interval (float): Interval for sending heartbeat to the peer. + heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer. + workers (int): Number of worker threads needed. + train_with_evaluation (bool): Whether to run training with global model evaluation. + train_task_name (str): Task name of train mode. + evaluate_task_name (str): Task name of evaluate mode. + submit_model_task_name (str): Task name of submit_model mode. from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. This ParamsConverter will be called when model is sent from nvflare controller side to executor side. to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components. @@ -280,12 +281,15 @@ def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: return False if abort_signal.triggered: + self.log_info(fl_ctx, "External execution is not set up but abort signal is triggered.") return False if self.peer_is_up_or_dead(): return True - if self.launcher.check_run_status(task_name, fl_ctx) != LauncherRunStatus.RUNNING: + run_status = self.launcher.check_run_status(task_name, fl_ctx) + if run_status != LauncherRunStatus.RUNNING: + self.log_info(fl_ctx, f"External execution is not set up and run status becomes {run_status}.") return False time.sleep(0.1) @@ -293,18 +297,17 @@ def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: def _finalize_external_execution( self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal ) -> bool: - with self._lock: - if self._job_end: - ask_peer_end_success = self.ask_peer_to_end(fl_ctx) - if not ask_peer_end_success: - return False + if self._job_end: + ask_peer_end_success = self.ask_peer_to_end(fl_ctx) + if not ask_peer_end_success: + return False check_run_status = self._execute_launcher_method_in_thread_executor( method_name="check_run_status", task_name=task_name, fl_ctx=fl_ctx, ) - if check_run_status != LauncherRunStatus.COMPLETE_SUCCESS: + if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS: self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}") self.log_info(fl_ctx, f"Calling stop task ({task_name}).") @@ -365,6 +368,7 @@ def _monitor_launcher(self, fl_ctx: FLContext): break if self._current_task is None: + self.pause_pipe_handler() continue task_name = self._current_task diff --git a/nvflare/app_common/executors/task_exchanger.py b/nvflare/app_common/executors/task_exchanger.py index 77a7b19bb9..c67b7cc63d 100644 --- a/nvflare/app_common/executors/task_exchanger.py +++ b/nvflare/app_common/executors/task_exchanger.py @@ -35,10 +35,10 @@ def __init__( pipe_id: str, read_interval: float = 0.5, heartbeat_interval: float = 5.0, - heartbeat_timeout: Optional[float] = 30.0, + heartbeat_timeout: Optional[float] = 60.0, resend_interval: float = 2.0, max_resends: Optional[int] = None, - peer_read_timeout: Optional[float] = 5.0, + peer_read_timeout: Optional[float] = 60.0, task_wait_time: Optional[float] = None, result_poll_interval: float = 0.5, pipe_channel_name=PipeChannelName.TASK, @@ -48,19 +48,16 @@ def __init__( Args: pipe_id (str): component id of pipe. read_interval (float): how often to read from pipe. - Defaults to 0.5. heartbeat_interval (float): how often to send heartbeat to peer. - Defaults to 5.0. heartbeat_timeout (float, optional): how long to wait for a heartbeat from the peer before treating the peer as dead, - 0 means DO NOT check for heartbeat. Defaults to 30.0. + 0 means DO NOT check for heartbeat. resend_interval (float): how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, - then no resend. Defaults to 2.0. + then no resend. max_resends (int, optional): max number of resend. None means no limit. Defaults to None. peer_read_timeout (float, optional): time to wait for peer to accept sent message. - Defaults to 5.0. task_wait_time (float, optional): how long to wait for a task to complete. None means waiting forever. Defaults to None. result_poll_interval (float): how often to poll task result. @@ -114,6 +111,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext): ) self.pipe_handler.set_status_cb(self._pipe_status_cb) self.pipe.open(self.pipe_channel_name) + elif event_type == EventType.BEFORE_TASK_EXECUTION: self.pipe_handler.start() elif event_type == EventType.ABOUT_TO_END_RUN: self.log_info(fl_ctx, "Stopping pipe handler") diff --git a/nvflare/client/flare_agent.py b/nvflare/client/flare_agent.py index 763449d824..5c86cd7b98 100644 --- a/nvflare/client/flare_agent.py +++ b/nvflare/client/flare_agent.py @@ -66,10 +66,10 @@ def __init__( pipe: Pipe, read_interval=0.1, heartbeat_interval=5.0, - heartbeat_timeout=30.0, + heartbeat_timeout=60.0, resend_interval=2.0, max_resends=None, - submit_result_timeout=30.0, + submit_result_timeout=60.0, metric_pipe=None, task_channel_name: str = PipeChannelName.TASK, metric_channel_name: str = PipeChannelName.METRIC, @@ -83,15 +83,15 @@ def __init__( Args: pipe (Pipe): pipe for task communication. - read_interval (float): how often to read from the pipe. Defaults to 0.1. - heartbeat_interval (float): how often to send a heartbeat to the peer. Defaults to 5.0. + read_interval (float): how often to read from the pipe. + heartbeat_interval (float): how often to send a heartbeat to the peer. heartbeat_timeout (float): how long to wait for a heartbeat from the peer before treating the peer as dead, - 0 means DO NOT check for heartbeat. Defaults to 30.0. + 0 means DO NOT check for heartbeat. resend_interval (float): how often to resend a message if failing to send. None means no resend. - Note that if the pipe does not support resending, then no resend. Defaults to 2.0. + Note that if the pipe does not support resending, then no resend. max_resends (int, optional): max number of resend. None means no limit. Defaults to None. submit_result_timeout (float): when submitting task result, - how long to wait for response from the CJ. Defaults to 30.0. + how long to wait for response from the CJ. metric_pipe (Pipe, optional): pipe for metric communication. Defaults to None. task_channel_name (str): channel name for task. Defaults to ``task``. metric_channel_name (str): channel name for metric. Defaults to ``metric``. @@ -346,10 +346,10 @@ def __init__( workspace_dir: str, read_interval=0.1, heartbeat_interval=5.0, - heartbeat_timeout=30.0, + heartbeat_timeout=60.0, resend_interval=2.0, max_resends=None, - submit_result_timeout=30.0, + submit_result_timeout=60.0, has_metrics=False, ): """Constructor of Flare Agent with Cell Pipe. This is a convenient class. diff --git a/nvflare/fuel/utils/pipe/pipe_handler.py b/nvflare/fuel/utils/pipe/pipe_handler.py index 4826c0bfa4..ce15a906dd 100644 --- a/nvflare/fuel/utils/pipe/pipe_handler.py +++ b/nvflare/fuel/utils/pipe/pipe_handler.py @@ -59,7 +59,7 @@ def __init__( pipe: Pipe, read_interval=0.1, heartbeat_interval=5.0, - heartbeat_timeout=30.0, + heartbeat_timeout=60.0, resend_interval=2.0, max_resends=None, default_request_timeout=5.0, @@ -70,7 +70,8 @@ def __init__( pipe (Pipe): the pipe to be monitored. read_interval (float): how often to read from the pipe. heartbeat_interval (float): how often to send a heartbeat to the peer. - heartbeat_timeout (float): how long to wait for a heartbeat from the peer before treating the peer as gone, + heartbeat_timeout (float): how long to wait for a heartbeat from + the peer before treating the peer as gone, 0 means DO NOT check for heartbeat. resend_interval (float): how often to resend a message if failing to send. None means no resend. Note that if the pipe does not support resending, then no resend. @@ -208,10 +209,10 @@ def start(self): """Starts the PipeHandler. Note: before calling this method, the pipe managed by this PipeHandler must have been opened. """ - if not self.reader.is_alive(): + if self.reader and not self.reader.is_alive(): self.reader.start() - if not self.heartbeat_sender.is_alive(): + if self.heartbeat_sender and not self.heartbeat_sender.is_alive(): self.heartbeat_sender.start() def stop(self, close_pipe=True): @@ -256,7 +257,7 @@ def notify_end(self, data): p = self.pipe if p: try: - p.send(self._make_event_message(Topic.END, data)) + p.send(self._make_event_message(Topic.END, data), self.default_request_timeout) except Exception as ex: self.logger.debug(f"exception notify_end: {secure_format_exception(ex)}") @@ -265,7 +266,7 @@ def notify_abort(self, data): p = self.pipe if p: try: - p.send(self._make_event_message(Topic.ABORT, data)) + p.send(self._make_event_message(Topic.ABORT, data), self.default_request_timeout) except Exception as ex: self.logger.debug(f"exception notify_abort: {secure_format_exception(ex)}") @@ -339,6 +340,7 @@ def _heartbeat(self): last_heartbeat_sent_time = now time.sleep(self._check_interval) + self.heartbeat_sender = None def get_next(self) -> Optional[Message]: """Gets the next message from the message queue.