Skip to content

Commit

Permalink
Add support of large object in LauncherExecutor/Client API using Cell…
Browse files Browse the repository at this point in the history
…Pipe
  • Loading branch information
YuanTingHsieh committed Mar 21, 2024
1 parent d5b96b2 commit e71cf41
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 64 deletions.
33 changes: 17 additions & 16 deletions nvflare/app_common/executors/client_api_launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def __init__(
launch_timeout: Optional[float] = None,
task_wait_timeout: Optional[float] = None,
last_result_transfer_timeout: float = 300.0,
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
external_setup_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 0.01,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
heartbeat_timeout: float = 60.0,
workers: int = 4,
train_with_evaluation: bool = True,
train_task_name: str = "train",
Expand All @@ -50,22 +50,23 @@ def __init__(
"""Initializes the ClientAPILauncherExecutor.
Args:
pipe_id (Optional[str]): Identifier for obtaining the Pipe from NVFlare components.
pipe_id (str): Identifier for obtaining the Pipe from NVFlare components.
launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout).
task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process.
This value should be greater than the time needed for sending the whole result.
peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout).
monitor_interval (float): Interval for monitoring the launcher (default: 0.01).
read_interval (float): Interval for reading from the pipe (default: 0.5).
heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0).
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0).
workers (int): Number of worker threads needed (default: 4).
train_with_evaluation (bool): Whether to run training with global model evaluation (default: True).
train_task_name (str): Task name of train mode (default: train).
evaluate_task_name (str): Task name of evaluate mode (default: evaluate).
submit_model_task_name (str): Task name of submit_model mode (default: submit_model).
external_setup_timeout (float): Time to wait for external process to start.
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
monitor_interval (float): Interval for monitoring the launcher.
read_interval (float): Interval for reading from the pipe.
heartbeat_interval (float): Interval for sending heartbeat to the peer.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer.
workers (int): Number of worker threads needed.
train_with_evaluation (bool): Whether to run training with global model evaluation.
train_task_name (str): Task name of train mode.
evaluate_task_name (str): Task name of evaluate mode.
submit_model_task_name (str): Task name of submit_model mode.
from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
This ParamsConverter will be called when model is sent from nvflare controller side to executor side.
to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
Expand All @@ -82,7 +83,7 @@ def __init__(
launch_timeout=launch_timeout,
task_wait_timeout=task_wait_timeout,
last_result_transfer_timeout=last_result_transfer_timeout,
external_execution_wait=external_execution_wait,
external_setup_timeout=external_setup_timeout,
peer_read_timeout=peer_read_timeout,
monitor_interval=monitor_interval,
read_interval=read_interval,
Expand Down
57 changes: 31 additions & 26 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def __init__(
launch_timeout: Optional[float] = None,
task_wait_timeout: Optional[float] = None,
last_result_transfer_timeout: float = 300.0,
external_execution_wait: float = 5.0,
peer_read_timeout: Optional[float] = None,
external_setup_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 1.0,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
workers: int = 1,
heartbeat_timeout: float = 60.0,
workers: int = 4,
train_with_evaluation: bool = True,
train_task_name: str = "train",
evaluate_task_name: str = "evaluate",
Expand All @@ -63,18 +63,19 @@ def __init__(
launcher_id (Optional[str]): Identifier for obtaining the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the Launcher's "launch_task" method to complete (None for no timeout).
task_wait_timeout (Optional[float]): Timeout for retrieving the task result (None for no timeout).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process (default: 5.0).
last_result_transfer_timeout (float): Timeout for transmitting the last result from an external process.
This value should be greater than the time needed for sending the whole result.
peer_read_timeout (Optional[float]): Timeout for waiting the task to be read by the peer from the pipe (None for no timeout).
monitor_interval (float): Interval for monitoring the launcher (default: 0.01).
read_interval (float): Interval for reading from the pipe (default: 0.5).
heartbeat_interval (float): Interval for sending heartbeat to the peer (default: 5.0).
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer (default: 30.0).
workers (int): Number of worker threads needed (default: 1).
train_with_evaluation (bool): Whether to run training with global model evaluation (default: True).
train_task_name (str): Task name of train mode (default: train).
evaluate_task_name (str): Task name of evaluate mode (default: evaluate).
submit_model_task_name (str): Task name of submit_model mode (default: submit_model).
external_setup_timeout (float): Time to wait for external process to start.
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
monitor_interval (float): Interval for monitoring the launcher.
read_interval (float): Interval for reading from the pipe.
heartbeat_interval (float): Interval for sending heartbeat to the peer.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer.
workers (int): Number of worker threads needed.
train_with_evaluation (bool): Whether to run training with global model evaluation.
train_task_name (str): Task name of train mode.
evaluate_task_name (str): Task name of evaluate mode.
submit_model_task_name (str): Task name of submit_model mode.
from_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
This ParamsConverter will be called when model is sent from nvflare controller side to executor side.
to_nvflare_converter_id (Optional[str]): Identifier used to get the ParamsConverter from NVFlare components.
Expand All @@ -96,7 +97,7 @@ def __init__(
self._launcher_finish = False
self._launcher_finish_time = None
self._last_result_transfer_timeout = last_result_transfer_timeout
self._external_execution_wait = external_execution_wait
self._external_setup_timeout = external_setup_timeout
self._received_result = Event()
self._job_end = False

Expand Down Expand Up @@ -249,7 +250,6 @@ def _initialize_external_execution(
self.log_error(fl_ctx, "External execution set up failed.")
abort_signal.trigger("External execution set up failed.")
return False
time.sleep(self._external_execution_wait)
return True

def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs) -> Any:
Expand Down Expand Up @@ -277,36 +277,40 @@ def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs
def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: Signal):
start_time = time.time()
while True:
if self._launch_timeout and time.time() - start_time >= self._launch_timeout:
self.log_error(fl_ctx, f"External execution is not set up within timeout: {self._launch_timeout}")
if self._external_setup_timeout and time.time() - start_time >= self._external_setup_timeout:
self.log_error(
fl_ctx, f"External execution is not set up within timeout: {self._external_setup_timeout}"
)
return False

if abort_signal.triggered:
self.log_info(fl_ctx, "External execution is not set up but abort signal is triggered.")
return False

if self.peer_is_up_or_dead():
return True

if self.launcher.check_run_status(task_name, fl_ctx) != LauncherRunStatus.RUNNING:
run_status = self.launcher.check_run_status(task_name, fl_ctx)
if run_status != LauncherRunStatus.RUNNING:
self.log_info(fl_ctx, f"External execution is not set up and run status becomes {run_status}.")
return False

time.sleep(0.1)

def _finalize_external_execution(
self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal
) -> bool:
with self._lock:
if self._job_end:
ask_peer_end_success = self.ask_peer_to_end(fl_ctx)
if not ask_peer_end_success:
return False
if self._job_end:
ask_peer_end_success = self.ask_peer_to_end(fl_ctx)
if not ask_peer_end_success:
return False

check_run_status = self._execute_launcher_method_in_thread_executor(
method_name="check_run_status",
task_name=task_name,
fl_ctx=fl_ctx,
)
if check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}")

self.log_info(fl_ctx, f"Calling stop task ({task_name}).")
Expand Down Expand Up @@ -367,6 +371,7 @@ def _monitor_launcher(self, fl_ctx: FLContext):
break

if self._current_task is None:
self.pause_pipe_handler()
continue

task_name = self._current_task
Expand Down
12 changes: 5 additions & 7 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def __init__(
pipe_id: str,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: Optional[float] = 30.0,
heartbeat_timeout: Optional[float] = 60.0,
resend_interval: float = 2.0,
max_resends: Optional[int] = None,
peer_read_timeout: Optional[float] = 5.0,
peer_read_timeout: Optional[float] = 60.0,
task_wait_time: Optional[float] = None,
result_poll_interval: float = 0.5,
pipe_channel_name=PipeChannelName.TASK,
Expand All @@ -48,19 +48,16 @@ def __init__(
Args:
pipe_id (str): component id of pipe.
read_interval (float): how often to read from pipe.
Defaults to 0.5.
heartbeat_interval (float): how often to send heartbeat to peer.
Defaults to 5.0.
heartbeat_timeout (float, optional): how long to wait for a
heartbeat from the peer before treating the peer as dead,
0 means DO NOT check for heartbeat. Defaults to 30.0.
0 means DO NOT check for heartbeat.
resend_interval (float): how often to resend a message if failing to send.
None means no resend. Note that if the pipe does not support resending,
then no resend. Defaults to 2.0.
then no resend.
max_resends (int, optional): max number of resend. None means no limit.
Defaults to None.
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
Defaults to 5.0.
task_wait_time (float, optional): how long to wait for a task to complete.
None means waiting forever. Defaults to None.
result_poll_interval (float): how often to poll task result.
Expand Down Expand Up @@ -114,6 +111,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
)
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe.open(self.pipe_channel_name)
elif event_type == EventType.BEFORE_TASK_EXECUTION:
self.pipe_handler.start()
elif event_type == EventType.ABOUT_TO_END_RUN:
self.log_info(fl_ctx, "Stopping pipe handler")
Expand Down
18 changes: 9 additions & 9 deletions nvflare/client/flare_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ def __init__(
pipe: Pipe,
read_interval=0.1,
heartbeat_interval=5.0,
heartbeat_timeout=30.0,
heartbeat_timeout=60.0,
resend_interval=2.0,
max_resends=None,
submit_result_timeout=30.0,
submit_result_timeout=60.0,
metric_pipe=None,
task_channel_name: str = PipeChannelName.TASK,
metric_channel_name: str = PipeChannelName.METRIC,
Expand All @@ -83,15 +83,15 @@ def __init__(
Args:
pipe (Pipe): pipe for task communication.
read_interval (float): how often to read from the pipe. Defaults to 0.1.
heartbeat_interval (float): how often to send a heartbeat to the peer. Defaults to 5.0.
read_interval (float): how often to read from the pipe.
heartbeat_interval (float): how often to send a heartbeat to the peer.
heartbeat_timeout (float): how long to wait for a heartbeat from the peer before treating the peer as dead,
0 means DO NOT check for heartbeat. Defaults to 30.0.
0 means DO NOT check for heartbeat.
resend_interval (float): how often to resend a message if failing to send. None means no resend.
Note that if the pipe does not support resending, then no resend. Defaults to 2.0.
Note that if the pipe does not support resending, then no resend.
max_resends (int, optional): max number of resend. None means no limit. Defaults to None.
submit_result_timeout (float): when submitting task result,
how long to wait for response from the CJ. Defaults to 30.0.
how long to wait for response from the CJ.
metric_pipe (Pipe, optional): pipe for metric communication. Defaults to None.
task_channel_name (str): channel name for task. Defaults to ``task``.
metric_channel_name (str): channel name for metric. Defaults to ``metric``.
Expand Down Expand Up @@ -346,10 +346,10 @@ def __init__(
workspace_dir: str,
read_interval=0.1,
heartbeat_interval=5.0,
heartbeat_timeout=30.0,
heartbeat_timeout=60.0,
resend_interval=2.0,
max_resends=None,
submit_result_timeout=30.0,
submit_result_timeout=60.0,
has_metrics=False,
):
"""Constructor of Flare Agent with Cell Pipe. This is a convenient class.
Expand Down
16 changes: 10 additions & 6 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
pipe: Pipe,
read_interval=0.1,
heartbeat_interval=5.0,
heartbeat_timeout=30.0,
heartbeat_timeout=60.0,
resend_interval=2.0,
max_resends=None,
default_request_timeout=5.0,
Expand All @@ -70,7 +70,8 @@ def __init__(
pipe (Pipe): the pipe to be monitored.
read_interval (float): how often to read from the pipe.
heartbeat_interval (float): how often to send a heartbeat to the peer.
heartbeat_timeout (float): how long to wait for a heartbeat from the peer before treating the peer as gone,
heartbeat_timeout (float): how long to wait for a heartbeat from
the peer before treating the peer as gone,
0 means DO NOT check for heartbeat.
resend_interval (float): how often to resend a message if failing to send. None means no resend.
Note that if the pipe does not support resending, then no resend.
Expand Down Expand Up @@ -208,10 +209,10 @@ def start(self):
"""Starts the PipeHandler.
Note: before calling this method, the pipe managed by this PipeHandler must have been opened.
"""
if not self.reader.is_alive():
if self.reader and not self.reader.is_alive():
self.reader.start()

if not self.heartbeat_sender.is_alive():
if self.heartbeat_sender and not self.heartbeat_sender.is_alive():
self.heartbeat_sender.start()

def stop(self, close_pipe=True):
Expand Down Expand Up @@ -256,7 +257,8 @@ def notify_end(self, data):
p = self.pipe
if p:
try:
p.send(self._make_event_message(Topic.END, data))
# fire and forget
p.send(self._make_event_message(Topic.END, data), 0.1)
except Exception as ex:
self.logger.debug(f"exception notify_end: {secure_format_exception(ex)}")

Expand All @@ -265,7 +267,8 @@ def notify_abort(self, data):
p = self.pipe
if p:
try:
p.send(self._make_event_message(Topic.ABORT, data))
# fire and forget
p.send(self._make_event_message(Topic.ABORT, data), 0.1)
except Exception as ex:
self.logger.debug(f"exception notify_abort: {secure_format_exception(ex)}")

Expand Down Expand Up @@ -339,6 +342,7 @@ def _heartbeat(self):
last_heartbeat_sent_time = now

time.sleep(self._check_interval)
self.heartbeat_sender = None

def get_next(self) -> Optional[Message]:
"""Gets the next message from the message queue.
Expand Down

0 comments on commit e71cf41

Please sign in to comment.