Skip to content

Commit

Permalink
Starts pipe handler heartbeat send/check after task is pull before ta…
Browse files Browse the repository at this point in the history
…sk execution (#2442)
  • Loading branch information
YuanTingHsieh authored Mar 26, 2024
1 parent 70cfc8f commit 2da1249
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def __init__(
pipe_id: str,
read_interval: float = 0.5,
heartbeat_interval: float = 5.0,
heartbeat_timeout: Optional[float] = 30.0,
heartbeat_timeout: Optional[float] = 60.0,
resend_interval: float = 2.0,
max_resends: Optional[int] = None,
peer_read_timeout: Optional[float] = 5.0,
peer_read_timeout: Optional[float] = 60.0,
task_wait_time: Optional[float] = None,
result_poll_interval: float = 0.5,
pipe_channel_name=PipeChannelName.TASK,
Expand All @@ -48,19 +48,16 @@ def __init__(
Args:
pipe_id (str): component id of pipe.
read_interval (float): how often to read from pipe.
Defaults to 0.5.
heartbeat_interval (float): how often to send heartbeat to peer.
Defaults to 5.0.
heartbeat_timeout (float, optional): how long to wait for a
heartbeat from the peer before treating the peer as dead,
0 means DO NOT check for heartbeat. Defaults to 30.0.
0 means DO NOT check for heartbeat.
resend_interval (float): how often to resend a message if failing to send.
None means no resend. Note that if the pipe does not support resending,
then no resend. Defaults to 2.0.
then no resend.
max_resends (int, optional): max number of resend. None means no limit.
Defaults to None.
peer_read_timeout (float, optional): time to wait for peer to accept sent message.
Defaults to 5.0.
task_wait_time (float, optional): how long to wait for a task to complete.
None means waiting forever. Defaults to None.
result_poll_interval (float): how often to poll task result.
Expand Down Expand Up @@ -114,6 +111,7 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
)
self.pipe_handler.set_status_cb(self._pipe_status_cb)
self.pipe.open(self.pipe_channel_name)
elif event_type == EventType.BEFORE_TASK_EXECUTION:
self.pipe_handler.start()
elif event_type == EventType.ABOUT_TO_END_RUN:
self.log_info(fl_ctx, "Stopping pipe handler")
Expand Down

0 comments on commit 2da1249

Please sign in to comment.