Skip to content

Commit

Permalink
Fix pipe handler timeout in task exchanger and launcher executor (#2495)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh authored Apr 12, 2024
1 parent f4df50f commit 47b2feb
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
7 changes: 5 additions & 2 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(
last_result_transfer_timeout: float = 300.0,
external_pre_init_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 1.0,
monitor_interval: float = 0.1,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 60.0,
Expand Down Expand Up @@ -373,8 +373,8 @@ def _monitor_launcher(self, fl_ctx: FLContext):
if self.launcher is None:
break

# no task is running
if self._current_task is None:
self.pause_pipe_handler()
continue

task_name = self._current_task
Expand All @@ -390,16 +390,19 @@ def _monitor_launcher(self, fl_ctx: FLContext):
continue

elif run_status == LauncherRunStatus.NOT_RUNNING:
# pause pipe handler because external process is not running
self.pause_pipe_handler()
continue

elif run_status == LauncherRunStatus.RUNNING:
# resume pipe handler when external process is running
self.resume_pipe_handler()
continue

elif (
run_status == LauncherRunStatus.COMPLETE_FAILED or run_status == LauncherRunStatus.COMPLETE_SUCCESS
):
# pause pipe handler because external process is completed
self.pause_pipe_handler()
if not self._launcher_finish:
self._launcher_finish_time = time.time()
Expand Down
1 change: 0 additions & 1 deletion nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
)
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe.open(self.pipe_channel_name)
elif event_type == EventType.BEFORE_TASK_EXECUTION:
self.pipe_handler.start()
elif event_type == EventType.ABOUT_TO_END_RUN:
self.log_info(fl_ctx, "Stopping pipe handler")
Expand Down

0 comments on commit 47b2feb

Please sign in to comment.