Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.4] Fix pipe handler timeout in task exchanger and launcher executor #2495

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(
last_result_transfer_timeout: float = 300.0,
external_pre_init_timeout: float = 60.0,
peer_read_timeout: Optional[float] = 60.0,
monitor_interval: float = 1.0,
monitor_interval: float = 0.1,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 60.0,
Expand Down Expand Up @@ -373,8 +373,8 @@ def _monitor_launcher(self, fl_ctx: FLContext):
if self.launcher is None:
break

# no task is running
if self._current_task is None:
self.pause_pipe_handler()
continue

task_name = self._current_task
Expand All @@ -390,16 +390,19 @@ def _monitor_launcher(self, fl_ctx: FLContext):
continue

elif run_status == LauncherRunStatus.NOT_RUNNING:
# pause pipe handler because external process is not running
self.pause_pipe_handler()
continue

elif run_status == LauncherRunStatus.RUNNING:
# resume pipe handler when external process is running
self.resume_pipe_handler()
continue

elif (
run_status == LauncherRunStatus.COMPLETE_FAILED or run_status == LauncherRunStatus.COMPLETE_SUCCESS
):
# pause pipe handler because external process is completed
self.pause_pipe_handler()
if not self._launcher_finish:
self._launcher_finish_time = time.time()
Expand Down
1 change: 0 additions & 1 deletion nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
)
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe.open(self.pipe_channel_name)
elif event_type == EventType.BEFORE_TASK_EXECUTION:
self.pipe_handler.start()
elif event_type == EventType.ABOUT_TO_END_RUN:
self.log_info(fl_ctx, "Stopping pipe handler")
Expand Down
Loading