Skip to content

Commit

Permalink
Clean up FilePipe remains
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh committed Feb 17, 2024
1 parent 5341fc1 commit 54cb27c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
2 changes: 2 additions & 0 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def handle_event(self, event_type: str, fl_ctx: FLContext) -> None:
self.finalize(fl_ctx)
self.log_info(fl_ctx, f"{EventType.END_RUN} event received - telling external to stop")
super().handle_event(event_type, fl_ctx)
else:
super().handle_event(event_type, fl_ctx)

def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
self.log_info(fl_ctx, f"execute for task ({task_name})")
Expand Down
25 changes: 13 additions & 12 deletions nvflare/fuel/utils/pipe/file_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def __init__(self, mode: Mode, root_path: str, file_check_interval=0.1):
"""
super().__init__(mode=mode)
check_positive_number("file_check_interval", file_check_interval)
check_str("root_path", root_path)

self._remove_root = False
if not os.path.exists(root_path):
try:
# create the root path
os.makedirs(root_path)
self._remove_root = True
except Exception:
pass

self.root_path = root_path
self.file_check_interval = file_check_interval
Expand Down Expand Up @@ -78,12 +88,6 @@ def open(self, name: str):
if not self.accessor:
raise RuntimeError("File accessor is not set. Make sure to set a FileAccessor before opening the pipe")

check_str("root_path", self.root_path)

if not os.path.exists(self.root_path):
# create the root path
os.makedirs(self.root_path)

pipe_path = os.path.join(self.root_path, name)

if not os.path.exists(pipe_path):
Expand Down Expand Up @@ -138,8 +142,6 @@ def clear(self):
def _monitor_file(self, file_path: str, timeout=None) -> bool:
"""Monitors the file until it's read-and-removed by peer, or timed out.
If timeout, remove the file.
Args:
file_path: the path to be monitored
timeout: how long to wait for timeout
Expand Down Expand Up @@ -270,10 +272,9 @@ def close(self):
self.pipe_path = None
if self.mode == Mode.PASSIVE:
if pipe_path and os.path.exists(pipe_path):
try:
shutil.rmtree(pipe_path)
except Exception:
pass
shutil.rmtree(pipe_path, ignore_errors=True)
if self._remove_root and os.path.exists(self.root_path):
shutil.rmtree(self.root_path, ignore_errors=True)

def can_resend(self) -> bool:
return False
Expand Down

0 comments on commit 54cb27c

Please sign in to comment.