Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.4] task_exchanger starts pipe handler heartbeat send/check after task is pull #2442

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def __init__(
pipe_id: str,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: Optional[float] = 30.0,
heartbeat_timeout: Optional[float] = 60.0,
resend_interval: float = 2.0,
max_resends: Optional[int] = None,
peer_read_timeout: Optional[float] = 5.0,
peer_read_timeout: Optional[float] = 60.0,
task_wait_time: Optional[float] = None,
result_poll_interval: float = 0.5,
pipe_channel_name=PipeChannelName.TASK,
Expand All @@ -48,19 +48,16 @@ def __init__(
Args:
pipe_id (str): component id of pipe.
read_interval (float): how often to read from pipe.
Defaults to 0.5.
heartbeat_interval (float): how often to send heartbeat to peer.
Defaults to 5.0.
heartbeat_timeout (float, optional): how long to wait for a
heartbeat from the peer before treating the peer as dead,
0 means DO NOT check for heartbeat. Defaults to 30.0.
0 means DO NOT check for heartbeat.
resend_interval (float): how often to resend a message if failing to send.
None means no resend. Note that if the pipe does not support resending,
then no resend. Defaults to 2.0.
then no resend.
max_resends (int, optional): max number of resend. None means no limit.
Defaults to None.
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
Defaults to 5.0.
task_wait_time (float, optional): how long to wait for a task to complete.
None means waiting forever. Defaults to None.
result_poll_interval (float): how often to poll task result.
Expand Down Expand Up @@ -114,6 +111,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
)
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe.open(self.pipe_channel_name)
elif event_type == EventType.BEFORE_TASK_EXECUTION:
self.pipe_handler.start()
elif event_type == EventType.ABOUT_TO_END_RUN:
self.log_info(fl_ctx, "Stopping pipe handler")
Expand Down
Loading