Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.4] Fire and forget for pipe handler control messages #2413

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion nvflare/fuel/utils/pipe/file_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from nvflare.fuel.utils.pipe.file_accessor import FileAccessor
from nvflare.fuel.utils.pipe.file_name_utils import file_name_to_message, message_to_file_name
from nvflare.fuel.utils.pipe.fobs_file_accessor import FobsFileAccessor
from nvflare.fuel.utils.pipe.pipe import Message, Pipe
from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic
from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number, check_str


Expand Down Expand Up @@ -260,6 +260,10 @@ def send(self, msg: Message, timeout=None) -> bool:
"""
if not self.pipe_path:
raise BrokenPipeError("pipe is not open")

if not timeout and msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]:
timeout = 5.0

return self.put_f(msg, timeout)

def receive(self, timeout=None):
Expand Down
11 changes: 7 additions & 4 deletions nvflare/fuel/utils/pipe/pipe_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ def start(self):
"""Starts the PipeHandler.
Note: before calling this method, the pipe managed by this PipeHandler must have been opened.
"""
if not self.reader.is_alive():
if self.reader and not self.reader.is_alive():
self.reader.start()

if not self.heartbeat_sender.is_alive():
if self.heartbeat_sender and not self.heartbeat_sender.is_alive():
self.heartbeat_sender.start()

def stop(self, close_pipe=True):
Expand Down Expand Up @@ -258,7 +258,8 @@ def notify_end(self, data):
p = self.pipe
if p:
try:
p.send(self._make_event_message(Topic.END, data))
# fire and forget
p.send(self._make_event_message(Topic.END, data), 0.1)
except Exception as ex:
self.logger.debug(f"exception notify_end: {secure_format_exception(ex)}")

Expand All @@ -267,7 +268,8 @@ def notify_abort(self, data):
p = self.pipe
if p:
try:
p.send(self._make_event_message(Topic.ABORT, data))
# fire and forget
p.send(self._make_event_message(Topic.ABORT, data), 0.1)
except Exception as ex:
self.logger.debug(f"exception notify_abort: {secure_format_exception(ex)}")

Expand Down Expand Up @@ -346,6 +348,7 @@ def _heartbeat(self):
last_heartbeat_sent_time = now

time.sleep(self._check_interval)
self.heartbeat_sender = None

def get_next(self) -> Optional[Message]:
"""Gets the next message from the message queue.
Expand Down
Loading