diff --git a/stubs/pyfakefs/fake_filesystem.pyi b/stubs/pyfakefs/fake_filesystem.pyi new file mode 100644 index 00000000..466eb8b1 --- /dev/null +++ b/stubs/pyfakefs/fake_filesystem.pyi @@ -0,0 +1,151 @@ +from _typeshed import Incomplete # pylint: disable=import-error + + +def set_uid(uid) -> None: ... +def set_gid(gid) -> None: ... +def reset_ids() -> None: ... +def is_root(): ... + + +class FakeFilesystem: + path_separator: Incomplete + alternative_path_separator: Incomplete + patcher: Incomplete + is_windows_fs: Incomplete + is_macos: Incomplete + is_case_sensitive: Incomplete + root: Incomplete + cwd: Incomplete + umask: Incomplete + open_files: Incomplete + mount_points: Incomplete + dev_null: Incomplete + def __init__( + self, + path_separator=..., + total_size: Incomplete | None = ..., + patcher: Incomplete | None = ..., + ) -> None: ... + @property + def is_linux(self): ... + def reset(self, total_size: Incomplete | None = ...) -> None: ... + def pause(self) -> None: ... + def resume(self) -> None: ... + def line_separator(self): ... + def raise_os_error( + self, errno, filename: Incomplete | None = ..., winerror: Incomplete | None = ... + ) -> None: ... + def raise_io_error(self, errno, filename: Incomplete | None = ...) -> None: ... + def add_mount_point(self, path, total_size: Incomplete | None = ...): ... + def get_disk_usage(self, path: Incomplete | None = ...): ... + def set_disk_usage(self, total_size, path: Incomplete | None = ...) -> None: ... + def change_disk_usage(self, usage_change, file_path, st_dev) -> None: ... + def stat(self, entry_path, follow_symlinks: bool = ...): ... + def chmod(self, path, mode, follow_symlinks: bool = ...) -> None: ... + def utime( + self, + path, + times: Incomplete | None = ..., + ns: Incomplete | None = ..., + follow_symlinks: bool = ..., + ) -> None: ... + def get_open_file(self, file_des): ... + def has_open_file(self, file_object): ... + def normcase(self, path): ... + def normpath(self, path): ... + def absnormpath(self, path): ... + def splitpath(self, path): ... + def splitdrive(self, path): ... + def joinpaths(self, *paths): ... + def ends_with_path_separator(self, file_path): ... + def is_filepath_ending_with_separator(self, path): ... + def exists(self, file_path, check_link: bool = ...): ... + def resolve_path(self, file_path, allow_fd: bool = ..., raw_io: bool = ...): ... + def get_object_from_normpath(self, file_path, check_read_perm: bool = ...): ... + def get_object(self, file_path, check_read_perm: bool = ...): ... + def resolve( + self, + file_path, + follow_symlinks: bool = ..., + allow_fd: bool = ..., + check_read_perm: bool = ..., + ): ... + def lresolve(self, path): ... + def add_object(self, file_path, file_object, error_fct: Incomplete | None = ...) -> None: ... + def rename(self, old_file_path, new_file_path, force_replace: bool = ...) -> None: ... + def remove_object(self, file_path) -> None: ... + def make_string_path(self, path): ... + def create_dir(self, directory_path, perm_bits=...): ... + def create_file( + self, + file_path, + st_mode=..., + contents: str = ..., + st_size: Incomplete | None = ..., + create_missing_dirs: bool = ..., + apply_umask: bool = ..., + encoding: Incomplete | None = ..., + errors: Incomplete | None = ..., + side_effect: Incomplete | None = ..., + ): ... + def add_real_file( + self, source_path, read_only: bool = ..., target_path: Incomplete | None = ... + ): ... + def add_real_symlink(self, source_path, target_path: Incomplete | None = ...): ... + def add_real_directory( + self, + source_path, + read_only: bool = ..., + lazy_read: bool = ..., + target_path: Incomplete | None = ..., + ): ... + def add_real_paths( + self, path_list, read_only: bool = ..., lazy_dir_read: bool = ... + ) -> None: ... + def create_file_internally( + self, + file_path, + st_mode=..., + contents: str = ..., + st_size: Incomplete | None = ..., + create_missing_dirs: bool = ..., + apply_umask: bool = ..., + encoding: Incomplete | None = ..., + errors: Incomplete | None = ..., + read_from_real_fs: bool = ..., + raw_io: bool = ..., + side_effect: Incomplete | None = ..., + ): ... + def create_symlink(self, file_path, link_target, create_missing_dirs: bool = ...): ... + def link(self, old_path, new_path, follow_symlinks: bool = ...): ... + def readlink(self, path): ... + def makedir(self, dir_name, mode=...) -> None: ... + def makedirs(self, dir_name, mode=..., exist_ok: bool = ...) -> None: ... + def isdir(self, path, follow_symlinks: bool = ...): ... + def isfile(self, path, follow_symlinks: bool = ...): ... + def islink(self, path): ... + def confirmdir(self, target_directory): ... + def remove(self, path) -> None: ... + def rmdir(self, target_directory, allow_symlink: bool = ...) -> None: ... + def listdir(self, target_directory): ... + +class FakeFileOpen: + __name__: str + filesystem: Incomplete + raw_io: Incomplete + def __init__( + self, filesystem, delete_on_close: bool = ..., use_io: bool = ..., raw_io: bool = ... + ) -> None: ... + def __call__(self, *args, **kwargs): ... + def call( + self, + file_, + mode: str = ..., + buffering: int = ..., + encoding: Incomplete | None = ..., + errors: Incomplete | None = ..., + newline: Incomplete | None = ..., + closefd: bool = ..., + opener: Incomplete | None = ..., + open_modes: Incomplete | None = ..., + ): ... diff --git a/tests/test_mountingaccessor.py b/tests/test_mountingaccessor.py new file mode 100644 index 00000000..f19ffac3 --- /dev/null +++ b/tests/test_mountingaccessor.py @@ -0,0 +1,84 @@ +"""pytest tests testing subclasses of xcp.accessor.MountingAccessor using pyfakefs""" +import sys +from io import BytesIO +from typing import cast + +from mock import patch +from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem + +import xcp.accessor + +binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff" + + +def test_device_accessor(fs): + # type: (FakeFilesystem) -> None + accessor = xcp.accessor.createAccessor("dev:///dev/device", False) + check_mounting_accessor(accessor, fs) + + +def test_nfs_accessor(fs): + # type: (FakeFilesystem) -> None + accessor = xcp.accessor.createAccessor("nfs://server/path", False) + check_mounting_accessor(accessor, fs) + + +def check_mounting_accessor(accessor, fs): + # type: (xcp.accessor.MountingAccessor, FakeFilesystem) -> None + """Test subclasses of MountingAccessor (with xcp.cmd.runCmd in xcp.mount mocked)""" + + with patch("xcp.cmd.runCmd") as mount_runcmd: + mount_runcmd.return_value = (0, "", "") + accessor.start() + + assert accessor.location + assert fs.isdir(accessor.location) + + location = accessor.location + + if sys.version_info.major >= 3: + fs.add_mount_point(location) + + assert check_binary_read(accessor, location, fs) + assert check_binary_write(accessor, location, fs) + + if sys.version_info.major >= 3: + fs.mount_points.pop(location) + + with patch("xcp.cmd.runCmd"): + accessor.finish() + + assert not fs.exists(location) + + assert not accessor.location + + +def check_binary_read(accessor, location, fs): + # type: (xcp.accessor.MountingAccessor, str, FakeFilesystem) -> bool + """Test the openAddress() method of subclasses of xcp.accessor.MountingAccessor""" + + name = "binary_file" + path = location + "/" + name + + assert fs.create_file(path, contents=cast(str, binary_data)) + + assert accessor.access(name) + + binary_file = accessor.openAddress(name) + assert not isinstance(binary_file, bool) + + fs.remove(path) + return cast(bytes, binary_file.read()) == binary_data + + +def check_binary_write(accessor, location, fs): + # type: (xcp.accessor.MountingAccessor, str, FakeFilesystem) -> bool + """Test the writeFile() method of subclasses of xcp.accessor.MountingAccessor""" + + name = "binary_file_written_by_accessor" + accessor.writeFile(BytesIO(binary_data), name) + + assert accessor.access(name) + + with FakeFileOpen(fs, delete_on_close=True)(location + "/" + name, "rb") as written: + return cast(bytes, written.read()) == binary_data diff --git a/xcp/accessor.py b/xcp/accessor.py index 7a3f7f08..084f2237 100644 --- a/xcp/accessor.py +++ b/xcp/accessor.py @@ -155,13 +155,15 @@ def finish(self): return self.start_count -= 1 if self.start_count == 0: + assert self.location mount.umount(self.location) os.rmdir(self.location) self.location = None def writeFile(self, in_fh, out_name): + assert self.location logger.info("Copying to %s" % os.path.join(self.location, out_name)) - out_fh = open(os.path.join(self.location, out_name), 'w') + out_fh = open(os.path.join(self.location, out_name), "wb") return self._writeFile(in_fh, out_fh) def __del__(self):