Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up FilePipe remains #2369

Merged
merged 3 commits into from
Feb 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions nvflare/fuel/utils/pipe/file_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def __init__(self, mode: Mode, root_path: str, file_check_interval=0.1):
"""
super().__init__(mode=mode)
check_positive_number("file_check_interval", file_check_interval)
check_str("root_path", root_path)

self._remove_root = False
if not os.path.exists(root_path):
try:
# create the root path
os.makedirs(root_path)
self._remove_root = True
except Exception:
pass

self.root_path = root_path
self.file_check_interval = file_check_interval
Expand Down Expand Up @@ -78,12 +88,6 @@ def open(self, name: str):
if not self.accessor:
raise RuntimeError("File accessor is not set. Make sure to set a FileAccessor before opening the pipe")

check_str("root_path", self.root_path)

if not os.path.exists(self.root_path):
# create the root path
os.makedirs(self.root_path)

pipe_path = os.path.join(self.root_path, name)

if not os.path.exists(pipe_path):
Expand Down Expand Up @@ -138,8 +142,6 @@ def clear(self):
def _monitor_file(self, file_path: str, timeout=None) -> bool:
"""Monitors the file until it's read-and-removed by peer, or timed out.

If timeout, remove the file.

Args:
file_path: the path to be monitored
timeout: how long to wait for timeout
Expand Down Expand Up @@ -270,10 +272,9 @@ def close(self):
self.pipe_path = None
if self.mode == Mode.PASSIVE:
if pipe_path and os.path.exists(pipe_path):
try:
shutil.rmtree(pipe_path)
except Exception:
pass
shutil.rmtree(pipe_path, ignore_errors=True)
if self._remove_root and os.path.exists(self.root_path):
shutil.rmtree(self.root_path, ignore_errors=True)

def can_resend(self) -> bool:
return False
Expand Down
Loading