From 54cb27ca6044f8baef6dbccbb5995632cb98adf8 Mon Sep 17 00:00:00 2001 From: YuanTingHsieh Date: Wed, 7 Feb 2024 15:04:13 -0800 Subject: [PATCH] Clean up FilePipe remains --- .../app_common/executors/launcher_executor.py | 2 ++ nvflare/fuel/utils/pipe/file_pipe.py | 25 ++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/nvflare/app_common/executors/launcher_executor.py b/nvflare/app_common/executors/launcher_executor.py index 90cc6a3d0a..d62f0671fe 100644 --- a/nvflare/app_common/executors/launcher_executor.py +++ b/nvflare/app_common/executors/launcher_executor.py @@ -143,6 +143,8 @@ def handle_event(self, event_type: str, fl_ctx: FLContext) -> None: self.finalize(fl_ctx) self.log_info(fl_ctx, f"{EventType.END_RUN} event received - telling external to stop") super().handle_event(event_type, fl_ctx) + else: + super().handle_event(event_type, fl_ctx) def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable: self.log_info(fl_ctx, f"execute for task ({task_name})") diff --git a/nvflare/fuel/utils/pipe/file_pipe.py b/nvflare/fuel/utils/pipe/file_pipe.py index d5d7384876..4b5ceb19bd 100644 --- a/nvflare/fuel/utils/pipe/file_pipe.py +++ b/nvflare/fuel/utils/pipe/file_pipe.py @@ -39,6 +39,16 @@ def __init__(self, mode: Mode, root_path: str, file_check_interval=0.1): """ super().__init__(mode=mode) check_positive_number("file_check_interval", file_check_interval) + check_str("root_path", root_path) + + self._remove_root = False + if not os.path.exists(root_path): + try: + # create the root path + os.makedirs(root_path) + self._remove_root = True + except Exception: + pass self.root_path = root_path self.file_check_interval = file_check_interval @@ -78,12 +88,6 @@ def open(self, name: str): if not self.accessor: raise RuntimeError("File accessor is not set. Make sure to set a FileAccessor before opening the pipe") - check_str("root_path", self.root_path) - - if not os.path.exists(self.root_path): - # create the root path - os.makedirs(self.root_path) - pipe_path = os.path.join(self.root_path, name) if not os.path.exists(pipe_path): @@ -138,8 +142,6 @@ def clear(self): def _monitor_file(self, file_path: str, timeout=None) -> bool: """Monitors the file until it's read-and-removed by peer, or timed out. - If timeout, remove the file. - Args: file_path: the path to be monitored timeout: how long to wait for timeout @@ -270,10 +272,9 @@ def close(self): self.pipe_path = None if self.mode == Mode.PASSIVE: if pipe_path and os.path.exists(pipe_path): - try: - shutil.rmtree(pipe_path) - except Exception: - pass + shutil.rmtree(pipe_path, ignore_errors=True) + if self._remove_root and os.path.exists(self.root_path): + shutil.rmtree(self.root_path, ignore_errors=True) def can_resend(self) -> bool: return False