From b092551dcfb04c4297b9ae8f879e8ec32e28a8f3 Mon Sep 17 00:00:00 2001 From: YuanTingHsieh Date: Mon, 18 Mar 2024 18:20:28 -0700 Subject: [PATCH 1/2] Fire and forget for pipe handler control messages --- nvflare/fuel/utils/pipe/pipe_handler.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/nvflare/fuel/utils/pipe/pipe_handler.py b/nvflare/fuel/utils/pipe/pipe_handler.py index 944ce6d495..efbbcee134 100644 --- a/nvflare/fuel/utils/pipe/pipe_handler.py +++ b/nvflare/fuel/utils/pipe/pipe_handler.py @@ -210,10 +210,10 @@ def start(self): """Starts the PipeHandler. Note: before calling this method, the pipe managed by this PipeHandler must have been opened. """ - if not self.reader.is_alive(): + if self.reader and not self.reader.is_alive(): self.reader.start() - if not self.heartbeat_sender.is_alive(): + if self.heartbeat_sender and not self.heartbeat_sender.is_alive(): self.heartbeat_sender.start() def stop(self, close_pipe=True): @@ -258,7 +258,8 @@ def notify_end(self, data): p = self.pipe if p: try: - p.send(self._make_event_message(Topic.END, data)) + # fire and forget + p.send(self._make_event_message(Topic.END, data), 0.1) except Exception as ex: self.logger.debug(f"exception notify_end: {secure_format_exception(ex)}") @@ -267,7 +268,8 @@ def notify_abort(self, data): p = self.pipe if p: try: - p.send(self._make_event_message(Topic.ABORT, data)) + # fire and forget + p.send(self._make_event_message(Topic.ABORT, data), 0.1) except Exception as ex: self.logger.debug(f"exception notify_abort: {secure_format_exception(ex)}") @@ -346,6 +348,7 @@ def _heartbeat(self): last_heartbeat_sent_time = now time.sleep(self._check_interval) + self.heartbeat_sender = None def get_next(self) -> Optional[Message]: """Gets the next message from the message queue. From 7e45e3393f0dcdfc13184484ed0c2221be545a5e Mon Sep 17 00:00:00 2001 From: YuanTingHsieh Date: Mon, 1 Apr 2024 17:30:26 -0700 Subject: [PATCH 2/2] Add default timeout value --- nvflare/fuel/utils/pipe/file_pipe.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nvflare/fuel/utils/pipe/file_pipe.py b/nvflare/fuel/utils/pipe/file_pipe.py index f1d4d96269..c96573f428 100644 --- a/nvflare/fuel/utils/pipe/file_pipe.py +++ b/nvflare/fuel/utils/pipe/file_pipe.py @@ -22,7 +22,7 @@ from nvflare.fuel.utils.pipe.file_accessor import FileAccessor from nvflare.fuel.utils.pipe.file_name_utils import file_name_to_message, message_to_file_name from nvflare.fuel.utils.pipe.fobs_file_accessor import FobsFileAccessor -from nvflare.fuel.utils.pipe.pipe import Message, Pipe +from nvflare.fuel.utils.pipe.pipe import Message, Pipe, Topic from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number, check_str @@ -260,6 +260,10 @@ def send(self, msg: Message, timeout=None) -> bool: """ if not self.pipe_path: raise BrokenPipeError("pipe is not open") + + if not timeout and msg.topic in [Topic.END, Topic.ABORT, Topic.HEARTBEAT]: + timeout = 5.0 + return self.put_f(msg, timeout) def receive(self, timeout=None):